From 74bdcd674a0a671e7933d04c370040771d200f3b Mon Sep 17 00:00:00 2001 From: "li.pc" Date: Wed, 4 Feb 2026 17:45:01 +0800 Subject: [PATCH] [tag] Force creating auto-tag even no data when triggering --- docs/content/flink/procedures.md | 9 +++- docs/content/spark/procedures.md | 4 +- .../TriggerTagAutomaticCreationProcedure.java | 53 +++++++++++++++++-- ...erTagAutomaticCreationProcedureITCase.java | 23 +++++--- .../TriggerTagAutomaticCreationProcedure.java | 49 +++++++++++++++-- ...gerTagAutomaticCreationProcedureTest.scala | 21 +++++--- 6 files changed, 137 insertions(+), 22 deletions(-) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index cbbf88b32393..7d5bd5eb7a94 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -272,14 +272,21 @@ All available procedures are listed below. trigger_tag_automatic_creation + -- Use named argument + CALL [catalog.]sys.trigger_tag_automatic_creation(`table` => 'identifier') + CALL [catalog.]sys.trigger_tag_automatic_creation(`table` => 'identifier', force => false)

+ -- Use indexed argument CALL [catalog.]sys.trigger_tag_automatic_creation('identifier') + CALL [catalog.]sys.trigger_tag_automatic_creation('identifier', false) Trigger the tag automatic creation. Arguments:
  • table: the target table identifier. Cannot be empty.
  • +
  • force: force creating the auto-tag when it's after tag.creation-delay even no data exits. Default false.
  • - CALL sys.trigger_tag_automatic_creation(table => 'default.T') + CALL sys.trigger_tag_automatic_creation(table => 'default.T')
    + CALL sys.trigger_tag_automatic_creation(table => 'default.T', force => false) diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index c0abd0bae235..abf2936d8477 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -166,9 +166,11 @@ This section introduce all available spark procedures about paimon. Trigger the tag automatic creation. Arguments:
  • table: the target table identifier. Cannot be empty.
  • +
  • force: force creating the auto-tag when it's after tag.creation-delay even no data exits. Default false.
  • - CALL sys.trigger_tag_automatic_creation(table => 'default.T') + CALL sys.trigger_tag_automatic_creation(table => 'default.T')

    + CALL sys.trigger_tag_automatic_creation(table => 'default.T', force => false) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java index 7ca794f59f0a..cfb8e22be8d2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java @@ -18,8 +18,13 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.tag.TagAutoManager; +import org.apache.paimon.tag.TagPeriodHandler; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.annotation.DataTypeHint; @@ -27,6 +32,9 @@ import org.apache.flink.table.procedure.ProcedureContext; import org.apache.flink.types.Row; +import java.time.LocalDateTime; +import java.util.Collections; + /** * A procedure to trigger tag automatic creation for a table. Usage: * @@ -39,12 +47,37 @@ public class TriggerTagAutomaticCreationProcedure extends ProcedureBase { public static final String IDENTIFIER = "trigger_tag_automatic_creation"; - @ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))}) + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "force", type = @DataTypeHint("BOOLEAN"), isOptional = true) + }) public @DataTypeHint("ROW") Row[] call( - ProcedureContext procedureContext, String tableId) throws Exception { - ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))) - .newTagAutoManager() - .run(); + ProcedureContext procedureContext, String tableId, Boolean force) throws Exception { + FileStoreTable fsTable = + ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))); + String commitUser = CoreOptions.fromMap(fsTable.options()).createCommitUser(); + + try (TableCommitImpl commit = fsTable.newCommit(commitUser)) { + TagAutoManager tam = fsTable.newTagAutoManager(); + if (tam.getTagAutoCreation() != null) { + // Fist try + tam.run(); + + // If the expected tag not created, try again with an empty commit + if (force + && !isAutoTagExits(fsTable) + && tam.getTagAutoCreation().forceCreatingSnapshot()) { + ManifestCommittable committable = + new ManifestCommittable(Long.MAX_VALUE, null, Collections.emptyList()); + commit.ignoreEmptyCommit(false); + commit.commit(committable); + // Second try + tam.run(); + } + } + } + return new Row[] {Row.of("Success")}; } @@ -52,4 +85,14 @@ public class TriggerTagAutomaticCreationProcedure extends ProcedureBase { public String identifier() { return IDENTIFIER; } + + private boolean isAutoTagExits(FileStoreTable table) { + TagPeriodHandler periodHandler = + TagPeriodHandler.create(CoreOptions.fromMap(table.options())); + + // With forceCreatingSnapshot, the auto-tag should exist for "now" + LocalDateTime tagTime = periodHandler.normalizeToPreviousTag(LocalDateTime.now()); + String tagName = periodHandler.timeToTag(tagTime); + return table.tagManager().tagExists(tagName); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java index 8605b67be8ad..0780cda1b339 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java @@ -29,15 +29,25 @@ public class TriggerTagAutomaticCreationProcedureITCase extends CatalogITCaseBase { @Test - public void testTriggerTagAutomaticCreation() { + public void testAutoTagWithData() { + testTriggerTagAutomaticCreation(true); + } + + @Test + public void testAutoTagWithoutData() { + testTriggerTagAutomaticCreation(false); + } + + public void testTriggerTagAutomaticCreation(boolean haveData) { sql( "CREATE TABLE T (id INT, name STRING," + " PRIMARY KEY (id) NOT ENFORCED)" + " WITH ('bucket'='1')"); - sql("INSERT INTO T VALUES (1, 'a')"); - assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a")); - + if (haveData) { + sql("INSERT INTO T VALUES (1, 'a')"); + assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a")); + } assertThat(sql("select tag_name from `T$tags`").stream().map(Row::toString)) .isNullOrEmpty(); @@ -45,10 +55,11 @@ public void testTriggerTagAutomaticCreation() { "alter table T set (" + "'tag.automatic-creation'='process-time'," + "'tag.creation-period'='daily'," - + "'tag.creation-delay'='10 m'," + + "'tag.creation-delay'='0 m'," + "'tag.num-retained-max'='90')"); - sql("CALL sys.trigger_tag_automatic_creation(`table` => 'default.T')"); + sql("CALL sys.trigger_tag_automatic_creation(`table` => 'default.T', force => true)"); + assertThat(sql("select tag_name from `T$tags`").stream().map(Row::toString)) .isNotNull() .isNotEmpty(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java index f7ca06f20401..9270c11eef4a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java @@ -18,7 +18,12 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.tag.TagAutoManager; +import org.apache.paimon.tag.TagPeriodHandler; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; @@ -28,13 +33,20 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.time.LocalDateTime; +import java.util.Collections; + +import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.StringType; /** A procedure to trigger the tag automatic creation for a table. */ public class TriggerTagAutomaticCreationProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] {ProcedureParameter.required("table", StringType)}; + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.optional("force", BooleanType) + }; private static final StructType OUTPUT_TYPE = new StructType( @@ -59,11 +71,32 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + boolean force = !args.isNullAt(1) && args.getBoolean(1); return modifyPaimonTable( tableIdent, table -> { - try { - ((FileStoreTable) table).newTagAutoManager().run(); + FileStoreTable fsTable = ((FileStoreTable) table); + String commitUser = CoreOptions.fromMap(table.options()).createCommitUser(); + + try (TableCommitImpl commit = fsTable.newCommit(commitUser)) { + TagAutoManager tam = fsTable.newTagAutoManager(); + if (tam.getTagAutoCreation() != null) { + // Fist try + tam.run(); + // If the expected tag not created, try again with an empty commit + if (force + && !isAutoTagExits(fsTable) + && tam.getTagAutoCreation().forceCreatingSnapshot()) { + ManifestCommittable committable = + new ManifestCommittable( + Long.MAX_VALUE, null, Collections.emptyList()); + commit.ignoreEmptyCommit(false); + commit.commit(committable); + // Second try + tam.run(); + } + } + } catch (Exception e) { throw new RuntimeException(e); } @@ -72,6 +105,16 @@ public InternalRow[] call(InternalRow args) { }); } + private boolean isAutoTagExits(FileStoreTable table) { + TagPeriodHandler periodHandler = + TagPeriodHandler.create(CoreOptions.fromMap(table.options())); + + // With forceCreatingSnapshot, the auto-tag should exist for "now" + LocalDateTime tagTime = periodHandler.normalizeToPreviousTag(LocalDateTime.now()); + String tagName = periodHandler.timeToTag(tagTime); + return table.tagManager().tagExists(tagName); + } + public static ProcedureBuilder builder() { return new Builder() { @Override diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala index 422cf230601a..d7ae8e32a1a6 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala @@ -25,28 +25,37 @@ import org.assertj.core.api.Assertions.assertThat class TriggerTagAutomaticCreationProcedureTest extends PaimonSparkTestBase { - test("Paimon procedure: trigger tag automatic creation test") { + test("Paimon procedure: trigger tag automatic creation test with data") { + autoTagTest(true); + } + + test("Paimon procedure: trigger tag automatic creation test without data") { + autoTagTest(false); + } + + def autoTagTest(haveData: Boolean) = { spark.sql("""CREATE TABLE T (id INT, name STRING) |USING PAIMON |TBLPROPERTIES ( |'primary-key'='id' |)""".stripMargin) - spark.sql("insert into T values(1, 'a')") - val table = loadTable("T") - assertResult(1)(table.snapshotManager().snapshotCount()) + if (haveData) { + spark.sql("insert into T values(1, 'a')") + assertResult(1)(table.snapshotManager().snapshotCount()) + } assertResult(0)(spark.sql("show tags T").count()) spark.sql("""alter table T set tblproperties( |'tag.automatic-creation'='process-time', |'tag.creation-period'='daily', - |'tag.creation-delay'='10 m', + |'tag.creation-delay'='0 m', |'tag.num-retained-max'='90' |)""".stripMargin) - spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T')") + spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T', force => true)") assertResult(1)(spark.sql("show tags T").count()) assertResult( spark