From 015e9912f3fd91fe7fcf9c83c44e38c026cd165f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 16:20:32 +0800 Subject: [PATCH 1/7] root-fix --- .../iotdb/db/storageengine/load/LoadTsFileManager.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 082106bb5fd64..83372a5a6c0b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -92,9 +93,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT; -import static org.apache.iotdb.db.utils.constant.SqlConstant.TREE_MODEL_DATABASE_PREFIX; - /** * {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link * LoadCommand}. This class turn the content of a piece of loading TsFile into a new TsFile. When @@ -489,7 +487,7 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) final String tableName = chunkData.getDevice() != null ? chunkData.getDevice().getTableName() : null; if (tableName != null - && !(tableName.startsWith(TREE_MODEL_DATABASE_PREFIX) || tableName.equals(ROOT))) { + && PathUtils.isTableModelDatabase(partitionInfo.getDataRegion().getDatabaseName())) { // If the table does not exist, it means that the table is all deleted by mods final TsTable table = DataNodeTableCache.getInstance() From 3235551dd98317e8ef9864e4aec32232ec407299 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 17:00:53 +0800 Subject: [PATCH 2/7] f --- .../pipe/it/single/IoTDBPipeOPCUAIT.java | 6 +++--- .../customizer/parameter/PipeParameters.java | 19 ++++++++++++++++++- .../opcua/client/IoTDBOpcUaClient.java | 4 +++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 67060c6002b59..1ae4ec03fb2b2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -94,8 +94,8 @@ public void testOPCUAServerSink() throws Exception { final Map sinkAttributes = new HashMap<>(); sinkAttributes.put("sink", "opc-ua-sink"); - sinkAttributes.put("opcua.model", "client-server"); - sinkAttributes.put("security-policy", "None"); + sinkAttributes.put("model", "client-server"); + sinkAttributes.put("opcua.security-policy", "None"); OpcUaClient opcUaClient; DataValue value; @@ -103,7 +103,7 @@ public void testOPCUAServerSink() throws Exception { final int[] ports = EnvUtils.searchAvailablePorts(); tcpPort = ports[0]; httpsPort = ports[1]; - sinkAttributes.put("tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); sinkAttributes.put("https.port", Integer.toString(httpsPort)); Assert.assertEquals( diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index 989590f40a6b0..09b70db1f0295 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -92,7 +92,11 @@ public void computeAttributeIfExists( } public String getString(final String key) { - final String value = attributes.get(key); + String value = attributes.get(key); + if (Objects.nonNull(value)) { + return value; + } + value = attributes.get(KeyReducer.shallowReduce(key)); return value != null ? value : attributes.get(KeyReducer.reduce(key)); } @@ -380,6 +384,19 @@ private static class KeyReducer { SECOND_PREFIXES.add("opcua."); } + static String shallowReduce(String key) { + if (key == null) { + return null; + } + final String lowerCaseKey = key.toLowerCase(); + for (final String prefix : FIRST_PREFIXES) { + if (lowerCaseKey.startsWith(prefix)) { + return key.substring(prefix.length()); + } + } + return key; + } + static String reduce(String key) { if (key == null) { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java index bf96d9881807e..c6d8da47878eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java @@ -242,7 +242,9 @@ public List getNodesToAdd( } public void disconnect() throws Exception { - client.disconnect().get(); + if (Objects.nonNull(client)) { + client.disconnect().get(); + } } /////////////////////////////// Getter /////////////////////////////// From e3ff6db364fa8c39176a362877f974e7bcea077c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 17:14:54 +0800 Subject: [PATCH 3/7] fix --- .../org/apache/iotdb/db/schemaengine/SchemaEngine.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 3f59f9edf68ed..42079dcde2669 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -383,8 +383,14 @@ private long getTimeSeriesNumber(ISchemaRegion schemaRegion) { DataNodeTableCache.getInstance() .getTable( PathUtils.unQualifyDatabaseName(schemaRegion.getDatabaseFullPath()), - tableEntry.getKey()); - return Objects.nonNull(table) ? table.getFieldNum() * tableEntry.getValue() : 0; + tableEntry.getKey(), + false); + if (Objects.isNull(table)) { + logger.warn( + "Failed to get table when calculating the time series number. Maybe the cluster is restarting or the table is being dropped."); + return 0L; + } + return table.getFieldNum() * tableEntry.getValue(); }) .reduce(0L, Long::sum); } From ad85741f830d4d3c7c929347439d909995dec23d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 17:15:51 +0800 Subject: [PATCH 4/7] rest --- .../java/org/apache/iotdb/db/schemaengine/SchemaEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 42079dcde2669..2aac27147f0ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -387,7 +387,7 @@ private long getTimeSeriesNumber(ISchemaRegion schemaRegion) { false); if (Objects.isNull(table)) { logger.warn( - "Failed to get table when calculating the time series number. Maybe the cluster is restarting or the table is being dropped."); + "Failed to get table {}.{} when calculating the time series number. Maybe the cluster is restarting or the table is being dropped.", PathUtils.unQualifyDatabaseName(schemaRegion.getDatabaseFullPath()), tableEntry.getKey()); return 0L; } return table.getFieldNum() * tableEntry.getValue(); From 4b9d581e4b07f7cf09b5db5605b38bf21f458822 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 17:18:34 +0800 Subject: [PATCH 5/7] spls --- .../java/org/apache/iotdb/db/schemaengine/SchemaEngine.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 2aac27147f0ff..842281daa9667 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -387,7 +387,9 @@ private long getTimeSeriesNumber(ISchemaRegion schemaRegion) { false); if (Objects.isNull(table)) { logger.warn( - "Failed to get table {}.{} when calculating the time series number. Maybe the cluster is restarting or the table is being dropped.", PathUtils.unQualifyDatabaseName(schemaRegion.getDatabaseFullPath()), tableEntry.getKey()); + "Failed to get table {}.{} when calculating the time series number. Maybe the cluster is restarting or the table is being dropped.", + PathUtils.unQualifyDatabaseName(schemaRegion.getDatabaseFullPath()), + tableEntry.getKey()); return 0L; } return table.getFieldNum() * tableEntry.getValue(); From b1b9aa2da99082e73517a6d9842cebf53291be90 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 17:23:21 +0800 Subject: [PATCH 6/7] gsa --- .../iotdb/pipe/api/customizer/parameter/PipeParameters.java | 4 +++- .../pipe/api/customizer/parameter/PipeParametersTest.java | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index 09b70db1f0295..c4e7edd63d0ce 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -60,7 +60,9 @@ public Map getAttribute() { } public boolean hasAttribute(final String key) { - return attributes.containsKey(key) || attributes.containsKey(KeyReducer.reduce(key)); + return attributes.containsKey(key) + || attributes.containsKey(KeyReducer.shallowReduce(key)) + || attributes.containsKey(KeyReducer.reduce(key)); } public boolean hasAnyAttributes(final String... keys) { diff --git a/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java b/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java index bf85b5ff91272..d16b8e2879d4a 100644 --- a/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java +++ b/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java @@ -29,10 +29,10 @@ public class PipeParametersTest { @Test public void keyReducerTest() { final PipeParameters parameters = new PipeParameters(new HashMap<>()); - parameters.addAttribute("sink.opcua.with-quality", "false"); + parameters.addAttribute("sink.opcua.with-quality", "true"); - Assert.assertEquals(false, parameters.getBoolean("with-quality")); - Assert.assertEquals(false, parameters.getBoolean("opcua.with-quality")); + Assert.assertEquals(true, parameters.getBoolean("with-quality")); + Assert.assertEquals(true, parameters.getBoolean("opcua.with-quality")); // Invalid parameters.addAttribute("sink.source.opcua.value-name", "false"); From f6b52a6aa9a3775096199c64b270838460411f78 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 18:34:03 +0800 Subject: [PATCH 7/7] fix --- .../java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 1ae4ec03fb2b2..9ac2f4e17b7d2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -146,7 +146,7 @@ public void testOPCUAServerSink() throws Exception { final int[] ports = EnvUtils.searchAvailablePorts(); tcpPort = ports[0]; httpsPort = ports[1]; - sinkAttributes.put("tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); sinkAttributes.put("https.port", Integer.toString(httpsPort)); sinkAttributes.put("with-quality", "true");