diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java new file mode 100644 index 000000000..c108fc374 --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java @@ -0,0 +1,112 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.plugin.db.sink.ETLDBOutputFormat; + +/** + * Class that extends {@link ETLDBOutputFormat} to implement the abstract methods + */ +public class OracleETLDBOutputFormat extends ETLDBOutputFormat { + + /** + * This method is used to construct the upsert query for Oracle using MERGE statement. + * Example - MERGE INTO my_table target + * USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source + * ON (target.id = source.id) + * WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age + * WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age) + * @param table - Name of the table + * @param fieldNames - All the columns of the table + * @param listKeys - The columns used as keys for matching + * @return Upsert query in the form of string + */ + @Override + public String constructUpsertQuery(String table, String[] fieldNames, String[] listKeys) { + if (listKeys == null) { + throw new IllegalArgumentException("Column names to be updated should not be null"); + } else if (fieldNames == null) { + throw new IllegalArgumentException("Field names should not be null"); + } else { + StringBuilder query = new StringBuilder(); + + // MERGE INTO target_table target + query.append("MERGE INTO ").append(table).append(" target "); + + // USING (SELECT ? AS col1, ? AS col2, ... FROM dual) source + query.append("USING (SELECT "); + for (int i = 0; i < fieldNames.length; ++i) { + query.append("? AS ").append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(" FROM dual) source "); + + // ON (target.key1 = source.key1 AND target.key2 = source.key2 ...) + query.append("ON ("); + for (int i = 0; i < listKeys.length; ++i) { + query.append("target.").append(listKeys[i]).append(" = source.").append(listKeys[i]); + if (i != listKeys.length - 1) { + query.append(" AND "); + } + } + query.append(") "); + + // WHEN MATCHED THEN UPDATE SET target.col1 = source.col1, target.col2 = source.col2 ... + // Only update non-key columns + query.append("WHEN MATCHED THEN UPDATE SET "); + boolean firstUpdateColumn = true; + for (int i = 0; i < fieldNames.length; ++i) { + // Skip key columns in the UPDATE SET clause + boolean isKeyColumn = false; + for (String listKey : listKeys) { + if (listKey.equals(fieldNames[i])) { + isKeyColumn = true; + break; + } + } + if (!isKeyColumn) { + if (!firstUpdateColumn) { + query.append(", "); + } + query.append("target.").append(fieldNames[i]).append(" = source.").append(fieldNames[i]); + firstUpdateColumn = false; + } + } + + // WHEN NOT MATCHED THEN INSERT (col1, col2, ...) VALUES (source.col1, source.col2, ...) + query.append(" WHEN NOT MATCHED THEN INSERT ("); + for (int i = 0; i < fieldNames.length; ++i) { + query.append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(") VALUES ("); + for (int i = 0; i < fieldNames.length; ++i) { + query.append("source.").append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(")"); + + return query.toString(); + } + } +} diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java index 40ecfbe9e..fca1b72bf 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java @@ -23,6 +23,7 @@ import io.cdap.cdap.api.annotation.MetadataProperty; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Output; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; @@ -31,6 +32,7 @@ import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider; import io.cdap.plugin.common.db.DBErrorDetailsProvider; import io.cdap.plugin.db.DBRecord; import io.cdap.plugin.db.SchemaReader; @@ -72,6 +74,13 @@ protected FieldsValidator getFieldsValidator() { protected SchemaReader getSchemaReader() { return new OracleSinkSchemaReader(); } + + @Override + protected void addOutputContext(BatchSinkContext context) { + context.addOutput(Output.of(oracleSinkConfig.getReferenceName(), + new SinkOutputFormatProvider(OracleETLDBOutputFormat.class, getConfiguration()))); + } + @Override protected LineageRecorder getLineageRecorder(BatchSinkContext context) { String fqn = DBUtils.constructFQN("oracle", diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java new file mode 100644 index 000000000..02a49ae6c --- /dev/null +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java @@ -0,0 +1,128 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import org.junit.Assert; +import org.junit.Test; + +public class OracleETLDBOutputFormatTest { + + private final OracleETLDBOutputFormat outputFormat = new OracleETLDBOutputFormat(); + + @Test + public void testConstructUpsertQueryBasic() { + String[] fieldNames = {"id", "name", "age"}; + String[] listKeys = {"id"}; + String table = "my_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO my_table target " + + "USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age " + + "WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQueryMultipleKeys() { + String[] fieldNames = {"id", "code", "name", "value"}; + String[] listKeys = {"id", "code"}; + String table = "composite_key_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO composite_key_table target " + + "USING (SELECT ? AS id, ? AS code, ? AS name, ? AS value FROM dual) source " + + "ON (target.id = source.id AND target.code = source.code) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name, target.value = source.value " + + "WHEN NOT MATCHED THEN INSERT (id, code, name, value) VALUES (source.id, source.code, source.name, source.value)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQuerySingleField() { + String[] fieldNames = {"id", "name"}; + String[] listKeys = {"id"}; + String table = "single_field_update_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO single_field_update_table target " + + "USING (SELECT ? AS id, ? AS name FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name " + + "WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)"; + + Assert.assertEquals(expected, result); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructUpsertQueryNullListKeys() { + String[] fieldNames = {"id", "name", "age"}; + String table = "my_table"; + + outputFormat.constructUpsertQuery(table, fieldNames, null); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructUpsertQueryNullFieldNames() { + String[] listKeys = {"id"}; + String table = "my_table"; + + outputFormat.constructUpsertQuery(table, null, listKeys); + } + + @Test + public void testConstructUpsertQueryAllFieldsAreKeys() { + String[] fieldNames = {"id", "code"}; + String[] listKeys = {"id", "code"}; + String table = "all_keys_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + // When all fields are keys, the UPDATE SET clause will be empty after "SET " + // Note: There's an extra space before "WHEN NOT MATCHED" due to implementation + String expected = "MERGE INTO all_keys_table target " + + "USING (SELECT ? AS id, ? AS code FROM dual) source " + + "ON (target.id = source.id AND target.code = source.code) " + + "WHEN MATCHED THEN UPDATE SET " + + "WHEN NOT MATCHED THEN INSERT (id, code) VALUES (source.id, source.code)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQueryWithSpecialTableName() { + String[] fieldNames = {"id", "name"}; + String[] listKeys = {"id"}; + String table = "SCHEMA.MY_TABLE"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO SCHEMA.MY_TABLE target " + + "USING (SELECT ? AS id, ? AS name FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name " + + "WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)"; + + Assert.assertEquals(expected, result); + } +} diff --git a/oracle-plugin/widgets/Oracle-batchsink.json b/oracle-plugin/widgets/Oracle-batchsink.json index 8d8fc79a2..ad401bd89 100644 --- a/oracle-plugin/widgets/Oracle-batchsink.json +++ b/oracle-plugin/widgets/Oracle-batchsink.json @@ -192,6 +192,29 @@ "label": "Schema Name", "name": "dbSchemaName" }, + { + "widget-type": "radio-group", + "label": "Operation Name", + "name": "operationName", + "widget-attributes": { + "default": "insert", + "layout": "inline", + "options": [ + { + "id": "insert", + "label": "INSERT" + }, + { + "id": "update", + "label": "UPDATE" + }, + { + "id": "upsert", + "label": "UPSERT" + } + ] + } + }, { "widget-type": "hidden", "label": "Operation Name",