diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index cbbf88b32393..7d5bd5eb7a94 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -272,14 +272,21 @@ All available procedures are listed below.
| trigger_tag_automatic_creation |
+ -- Use named argument
+ CALL [catalog.]sys.trigger_tag_automatic_creation(`table` => 'identifier')
+ CALL [catalog.]sys.trigger_tag_automatic_creation(`table` => 'identifier', force => false)
+ -- Use indexed argument
CALL [catalog.]sys.trigger_tag_automatic_creation('identifier')
+ CALL [catalog.]sys.trigger_tag_automatic_creation('identifier', false)
|
Trigger the tag automatic creation. Arguments:
table: the target table identifier. Cannot be empty.
+ force: force creating the auto-tag when it's after tag.creation-delay even no data exits. Default false.
|
- CALL sys.trigger_tag_automatic_creation(table => 'default.T')
+ CALL sys.trigger_tag_automatic_creation(table => 'default.T')
+ CALL sys.trigger_tag_automatic_creation(table => 'default.T', force => false)
|
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index c0abd0bae235..abf2936d8477 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -166,9 +166,11 @@ This section introduce all available spark procedures about paimon.
|
Trigger the tag automatic creation. Arguments:
table: the target table identifier. Cannot be empty.
+ force: force creating the auto-tag when it's after tag.creation-delay even no data exits. Default false.
|
- CALL sys.trigger_tag_automatic_creation(table => 'default.T')
+ CALL sys.trigger_tag_automatic_creation(table => 'default.T')
+ CALL sys.trigger_tag_automatic_creation(table => 'default.T', force => false)
|
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
index 7ca794f59f0a..cfb8e22be8d2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
@@ -18,8 +18,13 @@
package org.apache.paimon.flink.procedure;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.tag.TagPeriodHandler;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
@@ -27,6 +32,9 @@
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.types.Row;
+import java.time.LocalDateTime;
+import java.util.Collections;
+
/**
* A procedure to trigger tag automatic creation for a table. Usage:
*
@@ -39,12 +47,37 @@ public class TriggerTagAutomaticCreationProcedure extends ProcedureBase {
public static final String IDENTIFIER = "trigger_tag_automatic_creation";
- @ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))})
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "force", type = @DataTypeHint("BOOLEAN"), isOptional = true)
+ })
public @DataTypeHint("ROW") Row[] call(
- ProcedureContext procedureContext, String tableId) throws Exception {
- ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId)))
- .newTagAutoManager()
- .run();
+ ProcedureContext procedureContext, String tableId, Boolean force) throws Exception {
+ FileStoreTable fsTable =
+ ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId)));
+ String commitUser = CoreOptions.fromMap(fsTable.options()).createCommitUser();
+
+ try (TableCommitImpl commit = fsTable.newCommit(commitUser)) {
+ TagAutoManager tam = fsTable.newTagAutoManager();
+ if (tam.getTagAutoCreation() != null) {
+ // Fist try
+ tam.run();
+
+ // If the expected tag not created, try again with an empty commit
+ if (force
+ && !isAutoTagExits(fsTable)
+ && tam.getTagAutoCreation().forceCreatingSnapshot()) {
+ ManifestCommittable committable =
+ new ManifestCommittable(Long.MAX_VALUE, null, Collections.emptyList());
+ commit.ignoreEmptyCommit(false);
+ commit.commit(committable);
+ // Second try
+ tam.run();
+ }
+ }
+ }
+
return new Row[] {Row.of("Success")};
}
@@ -52,4 +85,14 @@ public class TriggerTagAutomaticCreationProcedure extends ProcedureBase {
public String identifier() {
return IDENTIFIER;
}
+
+ private boolean isAutoTagExits(FileStoreTable table) {
+ TagPeriodHandler periodHandler =
+ TagPeriodHandler.create(CoreOptions.fromMap(table.options()));
+
+ // With forceCreatingSnapshot, the auto-tag should exist for "now"
+ LocalDateTime tagTime = periodHandler.normalizeToPreviousTag(LocalDateTime.now());
+ String tagName = periodHandler.timeToTag(tagTime);
+ return table.tagManager().tagExists(tagName);
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
index 8605b67be8ad..0780cda1b339 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
@@ -29,15 +29,25 @@
public class TriggerTagAutomaticCreationProcedureITCase extends CatalogITCaseBase {
@Test
- public void testTriggerTagAutomaticCreation() {
+ public void testAutoTagWithData() {
+ testTriggerTagAutomaticCreation(true);
+ }
+
+ @Test
+ public void testAutoTagWithoutData() {
+ testTriggerTagAutomaticCreation(false);
+ }
+
+ public void testTriggerTagAutomaticCreation(boolean haveData) {
sql(
"CREATE TABLE T (id INT, name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED)"
+ " WITH ('bucket'='1')");
- sql("INSERT INTO T VALUES (1, 'a')");
- assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
-
+ if (haveData) {
+ sql("INSERT INTO T VALUES (1, 'a')");
+ assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+ }
assertThat(sql("select tag_name from `T$tags`").stream().map(Row::toString))
.isNullOrEmpty();
@@ -45,10 +55,11 @@ public void testTriggerTagAutomaticCreation() {
"alter table T set ("
+ "'tag.automatic-creation'='process-time',"
+ "'tag.creation-period'='daily',"
- + "'tag.creation-delay'='10 m',"
+ + "'tag.creation-delay'='0 m',"
+ "'tag.num-retained-max'='90')");
- sql("CALL sys.trigger_tag_automatic_creation(`table` => 'default.T')");
+ sql("CALL sys.trigger_tag_automatic_creation(`table` => 'default.T', force => true)");
+
assertThat(sql("select tag_name from `T$tags`").stream().map(Row::toString))
.isNotNull()
.isNotEmpty();
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
index f7ca06f20401..9270c11eef4a 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
@@ -18,7 +18,12 @@
package org.apache.paimon.spark.procedure;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.tag.TagPeriodHandler;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -28,13 +33,20 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import java.time.LocalDateTime;
+import java.util.Collections;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/** A procedure to trigger the tag automatic creation for a table. */
public class TriggerTagAutomaticCreationProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {ProcedureParameter.required("table", StringType)};
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.optional("force", BooleanType)
+ };
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -59,11 +71,32 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
+ boolean force = !args.isNullAt(1) && args.getBoolean(1);
return modifyPaimonTable(
tableIdent,
table -> {
- try {
- ((FileStoreTable) table).newTagAutoManager().run();
+ FileStoreTable fsTable = ((FileStoreTable) table);
+ String commitUser = CoreOptions.fromMap(table.options()).createCommitUser();
+
+ try (TableCommitImpl commit = fsTable.newCommit(commitUser)) {
+ TagAutoManager tam = fsTable.newTagAutoManager();
+ if (tam.getTagAutoCreation() != null) {
+ // Fist try
+ tam.run();
+ // If the expected tag not created, try again with an empty commit
+ if (force
+ && !isAutoTagExits(fsTable)
+ && tam.getTagAutoCreation().forceCreatingSnapshot()) {
+ ManifestCommittable committable =
+ new ManifestCommittable(
+ Long.MAX_VALUE, null, Collections.emptyList());
+ commit.ignoreEmptyCommit(false);
+ commit.commit(committable);
+ // Second try
+ tam.run();
+ }
+ }
+
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -72,6 +105,16 @@ public InternalRow[] call(InternalRow args) {
});
}
+ private boolean isAutoTagExits(FileStoreTable table) {
+ TagPeriodHandler periodHandler =
+ TagPeriodHandler.create(CoreOptions.fromMap(table.options()));
+
+ // With forceCreatingSnapshot, the auto-tag should exist for "now"
+ LocalDateTime tagTime = periodHandler.normalizeToPreviousTag(LocalDateTime.now());
+ String tagName = periodHandler.timeToTag(tagTime);
+ return table.tagManager().tagExists(tagName);
+ }
+
public static ProcedureBuilder builder() {
return new Builder() {
@Override
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
index 422cf230601a..d7ae8e32a1a6 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
@@ -25,28 +25,37 @@ import org.assertj.core.api.Assertions.assertThat
class TriggerTagAutomaticCreationProcedureTest extends PaimonSparkTestBase {
- test("Paimon procedure: trigger tag automatic creation test") {
+ test("Paimon procedure: trigger tag automatic creation test with data") {
+ autoTagTest(true);
+ }
+
+ test("Paimon procedure: trigger tag automatic creation test without data") {
+ autoTagTest(false);
+ }
+
+ def autoTagTest(haveData: Boolean) = {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES (
|'primary-key'='id'
|)""".stripMargin)
- spark.sql("insert into T values(1, 'a')")
-
val table = loadTable("T")
- assertResult(1)(table.snapshotManager().snapshotCount())
+ if (haveData) {
+ spark.sql("insert into T values(1, 'a')")
+ assertResult(1)(table.snapshotManager().snapshotCount())
+ }
assertResult(0)(spark.sql("show tags T").count())
spark.sql("""alter table T set tblproperties(
|'tag.automatic-creation'='process-time',
|'tag.creation-period'='daily',
- |'tag.creation-delay'='10 m',
+ |'tag.creation-delay'='0 m',
|'tag.num-retained-max'='90'
|)""".stripMargin)
- spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T')")
+ spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T', force => true)")
assertResult(1)(spark.sql("show tags T").count())
assertResult(
spark