Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,21 @@ All available procedures are listed below.
<tr>
<td>trigger_tag_automatic_creation</td>
<td>
-- Use named argument
CALL [catalog.]sys.trigger_tag_automatic_creation(`table` => 'identifier')
CALL [catalog.]sys.trigger_tag_automatic_creation(`table` => 'identifier', force => false)<br/><br/>
-- Use indexed argument
CALL [catalog.]sys.trigger_tag_automatic_creation('identifier')
CALL [catalog.]sys.trigger_tag_automatic_creation('identifier', false)
</td>
<td>
Trigger the tag automatic creation. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>force: force creating the auto-tag when it's after tag.creation-delay even no data exits. Default false.</li>
</td>
<td>
CALL sys.trigger_tag_automatic_creation(table => 'default.T')
CALL sys.trigger_tag_automatic_creation(table => 'default.T')<br/>
CALL sys.trigger_tag_automatic_creation(table => 'default.T', force => false)
</td>
</tr>
<tr>
Expand Down
4 changes: 3 additions & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,11 @@ This section introduce all available spark procedures about paimon.
<td>
Trigger the tag automatic creation. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>force: force creating the auto-tag when it's after tag.creation-delay even no data exits. Default false.</li>
</td>
<td>
CALL sys.trigger_tag_automatic_creation(table => 'default.T')
CALL sys.trigger_tag_automatic_creation(table => 'default.T')<br/><br/>
CALL sys.trigger_tag_automatic_creation(table => 'default.T', force => false)
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagPeriodHandler;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.types.Row;

import java.time.LocalDateTime;
import java.util.Collections;

/**
* A procedure to trigger tag automatic creation for a table. Usage:
*
Expand All @@ -39,17 +47,52 @@ public class TriggerTagAutomaticCreationProcedure extends ProcedureBase {

public static final String IDENTIFIER = "trigger_tag_automatic_creation";

@ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))})
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "force", type = @DataTypeHint("BOOLEAN"), isOptional = true)
})
public @DataTypeHint("ROW<result STRING>") Row[] call(
ProcedureContext procedureContext, String tableId) throws Exception {
((FileStoreTable) catalog.getTable(Identifier.fromString(tableId)))
.newTagAutoManager()
.run();
ProcedureContext procedureContext, String tableId, Boolean force) throws Exception {
FileStoreTable fsTable =
((FileStoreTable) catalog.getTable(Identifier.fromString(tableId)));
String commitUser = CoreOptions.fromMap(fsTable.options()).createCommitUser();

try (TableCommitImpl commit = fsTable.newCommit(commitUser)) {
TagAutoManager tam = fsTable.newTagAutoManager();
if (tam.getTagAutoCreation() != null) {
// Fist try
tam.run();

// If the expected tag not created, try again with an empty commit
if (force
&& !isAutoTagExits(fsTable)
&& tam.getTagAutoCreation().forceCreatingSnapshot()) {
ManifestCommittable committable =
new ManifestCommittable(Long.MAX_VALUE, null, Collections.emptyList());
commit.ignoreEmptyCommit(false);
commit.commit(committable);
// Second try
tam.run();
}
}
}

return new Row[] {Row.of("Success")};
}

@Override
public String identifier() {
return IDENTIFIER;
}

private boolean isAutoTagExits(FileStoreTable table) {
TagPeriodHandler periodHandler =
TagPeriodHandler.create(CoreOptions.fromMap(table.options()));

// With forceCreatingSnapshot, the auto-tag should exist for "now"
LocalDateTime tagTime = periodHandler.normalizeToPreviousTag(LocalDateTime.now());
String tagName = periodHandler.timeToTag(tagTime);
return table.tagManager().tagExists(tagName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,37 @@
public class TriggerTagAutomaticCreationProcedureITCase extends CatalogITCaseBase {

@Test
public void testTriggerTagAutomaticCreation() {
public void testAutoTagWithData() {
testTriggerTagAutomaticCreation(true);
}

@Test
public void testAutoTagWithoutData() {
testTriggerTagAutomaticCreation(false);
}

public void testTriggerTagAutomaticCreation(boolean haveData) {
sql(
"CREATE TABLE T (id INT, name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED)"
+ " WITH ('bucket'='1')");

sql("INSERT INTO T VALUES (1, 'a')");
assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));

if (haveData) {
sql("INSERT INTO T VALUES (1, 'a')");
assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
}
assertThat(sql("select tag_name from `T$tags`").stream().map(Row::toString))
.isNullOrEmpty();

sql(
"alter table T set ("
+ "'tag.automatic-creation'='process-time',"
+ "'tag.creation-period'='daily',"
+ "'tag.creation-delay'='10 m',"
+ "'tag.creation-delay'='0 m',"
+ "'tag.num-retained-max'='90')");

sql("CALL sys.trigger_tag_automatic_creation(`table` => 'default.T')");
sql("CALL sys.trigger_tag_automatic_creation(`table` => 'default.T', force => true)");

assertThat(sql("select tag_name from `T$tags`").stream().map(Row::toString))
.isNotNull()
.isNotEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagPeriodHandler;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
Expand All @@ -28,13 +33,20 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.time.LocalDateTime;
import java.util.Collections;

import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/** A procedure to trigger the tag automatic creation for a table. */
public class TriggerTagAutomaticCreationProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {ProcedureParameter.required("table", StringType)};
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("force", BooleanType)
};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand All @@ -59,11 +71,32 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
boolean force = !args.isNullAt(1) && args.getBoolean(1);
return modifyPaimonTable(
tableIdent,
table -> {
try {
((FileStoreTable) table).newTagAutoManager().run();
FileStoreTable fsTable = ((FileStoreTable) table);
String commitUser = CoreOptions.fromMap(table.options()).createCommitUser();

try (TableCommitImpl commit = fsTable.newCommit(commitUser)) {
TagAutoManager tam = fsTable.newTagAutoManager();
if (tam.getTagAutoCreation() != null) {
// Fist try
tam.run();
// If the expected tag not created, try again with an empty commit
if (force
&& !isAutoTagExits(fsTable)
&& tam.getTagAutoCreation().forceCreatingSnapshot()) {
ManifestCommittable committable =
new ManifestCommittable(
Long.MAX_VALUE, null, Collections.emptyList());
commit.ignoreEmptyCommit(false);
commit.commit(committable);
// Second try
tam.run();
}
}

} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -72,6 +105,16 @@ public InternalRow[] call(InternalRow args) {
});
}

private boolean isAutoTagExits(FileStoreTable table) {
TagPeriodHandler periodHandler =
TagPeriodHandler.create(CoreOptions.fromMap(table.options()));

// With forceCreatingSnapshot, the auto-tag should exist for "now"
LocalDateTime tagTime = periodHandler.normalizeToPreviousTag(LocalDateTime.now());
String tagName = periodHandler.timeToTag(tagTime);
return table.tagManager().tagExists(tagName);
}

public static ProcedureBuilder builder() {
return new Builder<TriggerTagAutomaticCreationProcedure>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,37 @@ import org.assertj.core.api.Assertions.assertThat

class TriggerTagAutomaticCreationProcedureTest extends PaimonSparkTestBase {

test("Paimon procedure: trigger tag automatic creation test") {
test("Paimon procedure: trigger tag automatic creation test with data") {
autoTagTest(true);
}

test("Paimon procedure: trigger tag automatic creation test without data") {
autoTagTest(false);
}

def autoTagTest(haveData: Boolean) = {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES (
|'primary-key'='id'
|)""".stripMargin)

spark.sql("insert into T values(1, 'a')")

val table = loadTable("T")
assertResult(1)(table.snapshotManager().snapshotCount())
if (haveData) {
spark.sql("insert into T values(1, 'a')")
assertResult(1)(table.snapshotManager().snapshotCount())
}

assertResult(0)(spark.sql("show tags T").count())

spark.sql("""alter table T set tblproperties(
|'tag.automatic-creation'='process-time',
|'tag.creation-period'='daily',
|'tag.creation-delay'='10 m',
|'tag.creation-delay'='0 m',
|'tag.num-retained-max'='90'
|)""".stripMargin)

spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T')")
spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T', force => true)")
assertResult(1)(spark.sql("show tags T").count())
assertResult(
spark
Expand Down