diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 67060c6002b5..9ac2f4e17b7d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -94,8 +94,8 @@ public void testOPCUAServerSink() throws Exception { final Map sinkAttributes = new HashMap<>(); sinkAttributes.put("sink", "opc-ua-sink"); - sinkAttributes.put("opcua.model", "client-server"); - sinkAttributes.put("security-policy", "None"); + sinkAttributes.put("model", "client-server"); + sinkAttributes.put("opcua.security-policy", "None"); OpcUaClient opcUaClient; DataValue value; @@ -103,7 +103,7 @@ public void testOPCUAServerSink() throws Exception { final int[] ports = EnvUtils.searchAvailablePorts(); tcpPort = ports[0]; httpsPort = ports[1]; - sinkAttributes.put("tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); sinkAttributes.put("https.port", Integer.toString(httpsPort)); Assert.assertEquals( @@ -146,7 +146,7 @@ public void testOPCUAServerSink() throws Exception { final int[] ports = EnvUtils.searchAvailablePorts(); tcpPort = ports[0]; httpsPort = ports[1]; - sinkAttributes.put("tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); sinkAttributes.put("https.port", Integer.toString(httpsPort)); sinkAttributes.put("with-quality", "true"); diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index 989590f40a6b..c4e7edd63d0c 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -60,7 +60,9 @@ public Map getAttribute() { } public boolean hasAttribute(final String key) { - return attributes.containsKey(key) || attributes.containsKey(KeyReducer.reduce(key)); + return attributes.containsKey(key) + || attributes.containsKey(KeyReducer.shallowReduce(key)) + || attributes.containsKey(KeyReducer.reduce(key)); } public boolean hasAnyAttributes(final String... keys) { @@ -92,7 +94,11 @@ public void computeAttributeIfExists( } public String getString(final String key) { - final String value = attributes.get(key); + String value = attributes.get(key); + if (Objects.nonNull(value)) { + return value; + } + value = attributes.get(KeyReducer.shallowReduce(key)); return value != null ? value : attributes.get(KeyReducer.reduce(key)); } @@ -380,6 +386,19 @@ private static class KeyReducer { SECOND_PREFIXES.add("opcua."); } + static String shallowReduce(String key) { + if (key == null) { + return null; + } + final String lowerCaseKey = key.toLowerCase(); + for (final String prefix : FIRST_PREFIXES) { + if (lowerCaseKey.startsWith(prefix)) { + return key.substring(prefix.length()); + } + } + return key; + } + static String reduce(String key) { if (key == null) { return null; diff --git a/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java b/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java index bf85b5ff9127..d16b8e2879d4 100644 --- a/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java +++ b/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java @@ -29,10 +29,10 @@ public class PipeParametersTest { @Test public void keyReducerTest() { final PipeParameters parameters = new PipeParameters(new HashMap<>()); - parameters.addAttribute("sink.opcua.with-quality", "false"); + parameters.addAttribute("sink.opcua.with-quality", "true"); - Assert.assertEquals(false, parameters.getBoolean("with-quality")); - Assert.assertEquals(false, parameters.getBoolean("opcua.with-quality")); + Assert.assertEquals(true, parameters.getBoolean("with-quality")); + Assert.assertEquals(true, parameters.getBoolean("opcua.with-quality")); // Invalid parameters.addAttribute("sink.source.opcua.value-name", "false"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java index bf96d9881807..c6d8da47878e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java @@ -242,7 +242,9 @@ public List getNodesToAdd( } public void disconnect() throws Exception { - client.disconnect().get(); + if (Objects.nonNull(client)) { + client.disconnect().get(); + } } /////////////////////////////// Getter /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 3f59f9edf68e..842281daa966 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -383,8 +383,16 @@ private long getTimeSeriesNumber(ISchemaRegion schemaRegion) { DataNodeTableCache.getInstance() .getTable( PathUtils.unQualifyDatabaseName(schemaRegion.getDatabaseFullPath()), - tableEntry.getKey()); - return Objects.nonNull(table) ? table.getFieldNum() * tableEntry.getValue() : 0; + tableEntry.getKey(), + false); + if (Objects.isNull(table)) { + logger.warn( + "Failed to get table {}.{} when calculating the time series number. Maybe the cluster is restarting or the table is being dropped.", + PathUtils.unQualifyDatabaseName(schemaRegion.getDatabaseFullPath()), + tableEntry.getKey()); + return 0L; + } + return table.getFieldNum() * tableEntry.getValue(); }) .reduce(0L, Long::sum); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 082106bb5fd6..83372a5a6c0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -92,9 +93,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT; -import static org.apache.iotdb.db.utils.constant.SqlConstant.TREE_MODEL_DATABASE_PREFIX; - /** * {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link * LoadCommand}. This class turn the content of a piece of loading TsFile into a new TsFile. When @@ -489,7 +487,7 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) final String tableName = chunkData.getDevice() != null ? chunkData.getDevice().getTableName() : null; if (tableName != null - && !(tableName.startsWith(TREE_MODEL_DATABASE_PREFIX) || tableName.equals(ROOT))) { + && PathUtils.isTableModelDatabase(partitionInfo.getDataRegion().getDatabaseName())) { // If the table does not exist, it means that the table is all deleted by mods final TsTable table = DataNodeTableCache.getInstance()