Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ public void testOPCUAServerSink() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();

sinkAttributes.put("sink", "opc-ua-sink");
sinkAttributes.put("opcua.model", "client-server");
sinkAttributes.put("security-policy", "None");
sinkAttributes.put("model", "client-server");
sinkAttributes.put("opcua.security-policy", "None");

OpcUaClient opcUaClient;
DataValue value;
while (true) {
final int[] ports = EnvUtils.searchAvailablePorts();
tcpPort = ports[0];
httpsPort = ports[1];
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("https.port", Integer.toString(httpsPort));

Assert.assertEquals(
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testOPCUAServerSink() throws Exception {
final int[] ports = EnvUtils.searchAvailablePorts();
tcpPort = ports[0];
httpsPort = ports[1];
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("https.port", Integer.toString(httpsPort));
sinkAttributes.put("with-quality", "true");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public Map<String, String> getAttribute() {
}

public boolean hasAttribute(final String key) {
return attributes.containsKey(key) || attributes.containsKey(KeyReducer.reduce(key));
return attributes.containsKey(key)
|| attributes.containsKey(KeyReducer.shallowReduce(key))
|| attributes.containsKey(KeyReducer.reduce(key));
}

public boolean hasAnyAttributes(final String... keys) {
Expand Down Expand Up @@ -92,7 +94,11 @@ public void computeAttributeIfExists(
}

public String getString(final String key) {
final String value = attributes.get(key);
String value = attributes.get(key);
if (Objects.nonNull(value)) {
return value;
}
value = attributes.get(KeyReducer.shallowReduce(key));
return value != null ? value : attributes.get(KeyReducer.reduce(key));
}

Expand Down Expand Up @@ -380,6 +386,19 @@ private static class KeyReducer {
SECOND_PREFIXES.add("opcua.");
}

static String shallowReduce(String key) {
if (key == null) {
return null;
}
final String lowerCaseKey = key.toLowerCase();
for (final String prefix : FIRST_PREFIXES) {
if (lowerCaseKey.startsWith(prefix)) {
return key.substring(prefix.length());
}
}
return key;
}

static String reduce(String key) {
if (key == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public class PipeParametersTest {
@Test
public void keyReducerTest() {
final PipeParameters parameters = new PipeParameters(new HashMap<>());
parameters.addAttribute("sink.opcua.with-quality", "false");
parameters.addAttribute("sink.opcua.with-quality", "true");

Assert.assertEquals(false, parameters.getBoolean("with-quality"));
Assert.assertEquals(false, parameters.getBoolean("opcua.with-quality"));
Assert.assertEquals(true, parameters.getBoolean("with-quality"));
Assert.assertEquals(true, parameters.getBoolean("opcua.with-quality"));

// Invalid
parameters.addAttribute("sink.source.opcua.value-name", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ public List<AddNodesItem> getNodesToAdd(
}

public void disconnect() throws Exception {
client.disconnect().get();
if (Objects.nonNull(client)) {
client.disconnect().get();
}
}

/////////////////////////////// Getter ///////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,16 @@ private long getTimeSeriesNumber(ISchemaRegion schemaRegion) {
DataNodeTableCache.getInstance()
.getTable(
PathUtils.unQualifyDatabaseName(schemaRegion.getDatabaseFullPath()),
tableEntry.getKey());
return Objects.nonNull(table) ? table.getFieldNum() * tableEntry.getValue() : 0;
tableEntry.getKey(),
false);
if (Objects.isNull(table)) {
logger.warn(
"Failed to get table {}.{} when calculating the time series number. Maybe the cluster is restarting or the table is being dropped.",
PathUtils.unQualifyDatabaseName(schemaRegion.getDatabaseFullPath()),
tableEntry.getKey());
return 0L;
}
return table.getFieldNum() * tableEntry.getValue();
})
.reduce(0L, Long::sum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand Down Expand Up @@ -92,9 +93,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT;
import static org.apache.iotdb.db.utils.constant.SqlConstant.TREE_MODEL_DATABASE_PREFIX;

/**
* {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link
* LoadCommand}. This class turn the content of a piece of loading TsFile into a new TsFile. When
Expand Down Expand Up @@ -489,7 +487,7 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
final String tableName =
chunkData.getDevice() != null ? chunkData.getDevice().getTableName() : null;
if (tableName != null
&& !(tableName.startsWith(TREE_MODEL_DATABASE_PREFIX) || tableName.equals(ROOT))) {
&& PathUtils.isTableModelDatabase(partitionInfo.getDataRegion().getDatabaseName())) {
// If the table does not exist, it means that the table is all deleted by mods
final TsTable table =
DataNodeTableCache.getInstance()
Expand Down
Loading