diff --git a/demo/demo-etcd/gateway/pom.xml b/demo/demo-etcd/gateway/pom.xml
index b2eb1e51df2..9bed75de856 100644
--- a/demo/demo-etcd/gateway/pom.xml
+++ b/demo/demo-etcd/gateway/pom.xml
@@ -41,7 +41,7 @@
com.google.protobuf
protobuf-java
- 3.25.3
+ 3.25.5
runtime
diff --git a/demo/demo-etcd/provider/pom.xml b/demo/demo-etcd/provider/pom.xml
index c1413de54ce..cc926bda1ce 100644
--- a/demo/demo-etcd/provider/pom.xml
+++ b/demo/demo-etcd/provider/pom.xml
@@ -41,13 +41,17 @@
com.google.protobuf
protobuf-java
- 3.25.3
+ 3.25.5
runtime
org.apache.servicecomb
registry-etcd
+
+ org.apache.servicecomb
+ config-etcd
+
io.reactivex.rxjava3
diff --git a/demo/demo-etcd/provider/src/main/resources/application.yml b/demo/demo-etcd/provider/src/main/resources/application.yml
index 75b6dee9ea6..da2051cd77e 100644
--- a/demo/demo-etcd/provider/src/main/resources/application.yml
+++ b/demo/demo-etcd/provider/src/main/resources/application.yml
@@ -26,6 +26,10 @@ servicecomb:
registry:
etcd:
connectString: http://127.0.0.1:2379
+ config:
+ etcd:
+ connectString: http://127.0.0.1:2379
+ instance-tag: tag1
rest:
address: 0.0.0.0:9094?websocketEnabled=true
@@ -39,6 +43,13 @@ servicecomb:
allowedMethod: "*"
maxAge: 3600
+
key1: 1
key2: 3
key3: 5
+
+test1: env
+test2: applition
+test3: service
+test4: version
+test5: tag
diff --git a/demo/demo-etcd/test-client/pom.xml b/demo/demo-etcd/test-client/pom.xml
index 0c050adcb85..d14d1b1c45f 100644
--- a/demo/demo-etcd/test-client/pom.xml
+++ b/demo/demo-etcd/test-client/pom.xml
@@ -46,6 +46,16 @@
org.apache.servicecomb
registry-local
+
+ io.etcd
+ jetcd-core
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.25.5
+ runtime
+
@@ -60,7 +70,7 @@
- bitnami/etcd:latest
+ bitnami/etcd:3.5.16
etcd
alias
@@ -74,7 +84,7 @@
- etcd.port:2379
+ 2379:2379
yes
diff --git a/demo/demo-etcd/test-client/src/main/java/org/apache/servicecomb/samples/EtcdConfigIT.java b/demo/demo-etcd/test-client/src/main/java/org/apache/servicecomb/samples/EtcdConfigIT.java
new file mode 100644
index 00000000000..75b0c3fdaad
--- /dev/null
+++ b/demo/demo-etcd/test-client/src/main/java/org/apache/servicecomb/samples/EtcdConfigIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.samples;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.servicecomb.demo.CategorizedTestCase;
+import org.apache.servicecomb.demo.TestMgr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestOperations;
+import org.springframework.web.client.RestTemplate;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+
+@Component
+public class EtcdConfigIT implements CategorizedTestCase {
+ RestOperations template = new RestTemplate();
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(EtcdConfigIT.class);
+
+ @Override
+ public void testRestTransport() throws Exception {
+
+ testEnvironment();
+ testApplication();
+ testService();
+ testVersion();
+ testTag();
+ testOverride();
+ }
+
+ private void testOverride() {
+
+ putValue("/servicecomb/config/environment/production/application2.properties",
+ "testValue=t1");
+ putValue("/servicecomb/config/application/production/demo-etcd/application2.properties",
+ "testValue=t2");
+ testGetConfig("testValue", "t2");
+ putValue("/servicecomb/config/service/production/demo-etcd/provider/application2.properties",
+ "testValue=t3");
+ testGetConfig("testValue", "t3");
+ putValue("/servicecomb/config/version/production/demo-etcd/provider/0.0.1/application2.properties",
+ "testValue=t4");
+ testGetConfig("testValue", "t4");
+ putValue("/servicecomb/config/tag/production/demo-etcd/provider/0.0.1/tag1/application2.properties",
+ "testValue=t5");
+ testGetConfig("testValue", "t5");
+ }
+
+ private void testEnvironment() {
+
+ putValue("/servicecomb/config/environment/production/application.properties",
+ "test1=env");
+ putValue("/servicecomb/config/environment/production/application.properties",
+ "test1=env1");
+ testGetConfig("test1", "env1");
+ }
+
+ private void testApplication() {
+
+ putValue("/servicecomb/config/application/production/demo-etcd/application.properties",
+ "test2=applition");
+ putValue("/servicecomb/config/application/production/demo-etcd/application.properties",
+ "test2=applition2");
+ testGetConfig("test2", "applition2");
+ }
+
+ private void testService() {
+
+ putValue("/servicecomb/config/service/production/demo-etcd/provider/application.properties",
+ "test3=service");
+ putValue("/servicecomb/config/service/production/demo-etcd/provider/application.properties",
+ "test3=service3");
+ testGetConfig("test3", "service3");
+ }
+
+ private void testVersion() {
+
+ putValue("/servicecomb/config/version/production/demo-etcd/provider/0.0.1/application.properties",
+ "test3=version");
+ putValue("/servicecomb/config/version/production/demo-etcd/provider/0.0.1/application.properties",
+ "test4=version4");
+ testGetConfig("test4", "version4");
+ }
+
+ private void testTag() {
+
+ putValue("/servicecomb/config/tag/production/demo-etcd/provider/0.0.1/tag1/application.properties",
+ "test5=tag");
+ putValue("/servicecomb/config/tag/production/demo-etcd/provider/0.0.1/tag1/application.properties",
+ "test5=tag5");
+ testGetConfig("test5", "tag5");
+ }
+
+
+ public void putValue(String key, String value) {
+ try (Client client = Client.builder().endpoints("http://localhost:2379").build()) {
+
+ client.getKVClient().put(
+ ByteSequence.from(key, StandardCharsets.UTF_8),
+ ByteSequence.from(value, StandardCharsets.UTF_8)
+ ).get();
+
+ LOGGER.info("Value set successfully");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void testGetConfig(String key, String expectValue) {
+
+ String result = template
+ .getForObject(Config.GATEWAY_URL + "/getConfig?key=" + key, String.class);
+ TestMgr.check(expectValue, result);
+ }
+}
diff --git a/demo/demo-etcd/test-client/src/main/resources/application.yml b/demo/demo-etcd/test-client/src/main/resources/application.yml
index 396bebfd853..4245a4d2c86 100644
--- a/demo/demo-etcd/test-client/src/main/resources/application.yml
+++ b/demo/demo-etcd/test-client/src/main/resources/application.yml
@@ -23,3 +23,15 @@ servicecomb:
rest:
address: 0.0.0.0:9097 # should be same with server.port to use web container
+
+ config:
+ etcd:
+ instance-tag: tag1
+
+test1: env
+test2: applition
+test3: service
+test4: version
+test5: tag
+
+
diff --git a/dependencies/bom/pom.xml b/dependencies/bom/pom.xml
index f2218783fb2..16b2dea9845 100644
--- a/dependencies/bom/pom.xml
+++ b/dependencies/bom/pom.xml
@@ -283,6 +283,11 @@
registry-etcd
${project.version}
+
+ org.apache.servicecomb
+ config-etcd
+ ${project.version}
+
org.apache.servicecomb
diff --git a/dynamic-config/config-etcd/pom.xml b/dynamic-config/config-etcd/pom.xml
new file mode 100644
index 00000000000..a1c35d9872f
--- /dev/null
+++ b/dynamic-config/config-etcd/pom.xml
@@ -0,0 +1,44 @@
+
+
+
+
+
+
+ dynamic-config
+ org.apache.servicecomb
+ 3.3.0-SNAPSHOT
+
+ 4.0.0
+ config-etcd
+ Java Chassis::Dynamic Config::Zookeeper
+
+
+ org.apache.servicecomb
+ foundation-config
+
+
+ org.apache.servicecomb
+ foundation-vertx
+
+
+ io.etcd
+ jetcd-core
+
+
+
diff --git a/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdClient.java b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdClient.java
new file mode 100644
index 00000000000..f3088cb0c7e
--- /dev/null
+++ b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdClient.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.config.etcd;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.config.BootStrapProperties;
+import org.apache.servicecomb.config.etcd.EtcdDynamicPropertiesSource.UpdateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
+import org.springframework.core.env.Environment;
+import org.springframework.core.io.ByteArrayResource;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KeyValue;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.WatchOption;
+
+public class EtcdClient {
+
+ public class GetDataRunable implements Runnable {
+
+ private Map dataMap;
+
+ private EtcdClient etcdClient;
+
+ private String path;
+
+ public GetDataRunable(Map dataMap, EtcdClient etcdClient, String path) {
+ this.dataMap = dataMap;
+ this.etcdClient = etcdClient;
+ this.path = path;
+ }
+
+ @Override
+ public void run() {
+ try {
+ dataMap.clear();
+ dataMap.putAll(etcdClient.parseData(path));
+ refreshConfigItems();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(EtcdClient.class);
+
+ public static final String PATH_ENVIRONMENT = "/servicecomb/config/environment/%s";
+
+ public static final String PATH_APPLICATION = "/servicecomb/config/application/%s/%s";
+
+ public static final String PATH_SERVICE = "/servicecomb/config/service/%s/%s/%s";
+
+ public static final String PATH_VERSION = "/servicecomb/config/version/%s/%s/%s/%s";
+
+ public static final String PATH_TAG = "/servicecomb/config/tag/%s/%s/%s/%s/%s";
+
+ private final UpdateHandler updateHandler;
+
+ private final EtcdConfig etcdConfig;
+
+ private final Environment environment;
+
+ private final Object lock = new Object();
+
+ private Map environmentData = new HashMap<>();
+
+ private Map applicationData = new HashMap<>();
+
+ private Map serviceData = new HashMap<>();
+
+ private Map versionData = new HashMap<>();
+
+ private Map tagData = new HashMap<>();
+
+ private Map allLast = new HashMap<>();
+
+ private Client client;
+
+ public EtcdClient(UpdateHandler updateHandler, Environment environment) {
+ this.updateHandler = updateHandler;
+ this.etcdConfig = new EtcdConfig(environment);
+ this.environment = environment;
+ }
+
+ public void getClient() {
+ if (StringUtils.isEmpty(etcdConfig.getAuthInfo())) {
+ this.client = Client.builder().endpoints(etcdConfig.getConnectString()).build();
+ } else {
+ String[] authInfo = etcdConfig.getAuthInfo().split(":");
+ this.client = Client.builder().endpoints(etcdConfig.getConnectString())
+ .user(ByteSequence.from(authInfo[0], StandardCharsets.UTF_8))
+ .password(ByteSequence.from(authInfo[1], StandardCharsets.UTF_8)).build();
+ }
+ }
+
+ public void refreshEtcdConfig() throws Exception {
+
+ getClient();
+ String env = BootStrapProperties.readServiceEnvironment(environment);
+ if (StringUtils.isEmpty(env)) {
+ env = EtcdConfig.ZOOKEEPER_DEFAULT_ENVIRONMENT;
+ }
+ addEnvironmentConfig(env);
+ addApplicationConfig(env);
+ addServiceConfig(env);
+ addVersionConfig(env);
+ addTagConfig(env);
+
+ refreshConfigItems();
+ }
+
+ private void addTagConfig(String env) throws Exception {
+ if (StringUtils.isEmpty(etcdConfig.getInstanceTag())) {
+ return;
+ }
+ String path = String.format(PATH_TAG, env,
+ BootStrapProperties.readApplication(environment),
+ BootStrapProperties.readServiceName(environment),
+ BootStrapProperties.readServiceVersion(environment),
+ etcdConfig.getInstanceTag());
+
+ ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8);
+ Watch watchClient = client.getWatchClient();
+ watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(),
+ resp -> new Thread(new GetDataRunable(tagData, this, path)).start());
+ this.tagData = parseData(path);
+ }
+
+ private void addVersionConfig(String env) throws Exception {
+ String path = String.format(PATH_VERSION, env,
+ BootStrapProperties.readApplication(environment),
+ BootStrapProperties.readServiceName(environment),
+ BootStrapProperties.readServiceVersion(environment));
+
+ ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8);
+ Watch watchClient = client.getWatchClient();
+ watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(),
+ resp -> new Thread(new GetDataRunable(versionData, this, path)).start());
+ this.versionData = parseData(path);
+ }
+
+ private void addServiceConfig(String env) throws Exception {
+ String path = String.format(PATH_SERVICE, env,
+ BootStrapProperties.readApplication(environment),
+ BootStrapProperties.readServiceName(environment));
+
+ ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8);
+ Watch watchClient = client.getWatchClient();
+ watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(),
+ resp -> new Thread(new GetDataRunable(serviceData, this, path)).start());
+ this.serviceData = parseData(path);
+ }
+
+ private void addApplicationConfig(String env) throws Exception {
+ String path = String.format(PATH_APPLICATION, env, BootStrapProperties.readApplication(environment));
+
+ ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8);
+ Watch watchClient = client.getWatchClient();
+ watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(),
+ resp -> new Thread(new GetDataRunable(applicationData, this, path)).start());
+ this.applicationData = parseData(path);
+ }
+
+ private void addEnvironmentConfig(String env) throws Exception {
+ String path = String.format(PATH_ENVIRONMENT, env);
+
+ ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8);
+ Watch watchClient = client.getWatchClient();
+ watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(),
+ resp -> new Thread(new GetDataRunable(environmentData, this, path)).start());
+ this.environmentData = parseData(path);
+ }
+
+ public Map parseData(String path) throws Exception {
+
+ List endpointKv = getValuesByPrefix(path);
+ return getValues(path, endpointKv);
+ }
+
+ private Map getValues(String path, List endpointKv) {
+ Map values = new HashMap<>();
+ for (KeyValue keyValue : endpointKv) {
+ String key = new String(keyValue.getKey().getBytes(), StandardCharsets.UTF_8);
+ String value = new String(keyValue.getValue().getBytes(), StandardCharsets.UTF_8);
+ if (key.equals(path)) {
+ continue;
+ }
+ if (key.endsWith(".yaml") || key.endsWith(".yml")) {
+ YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
+ yamlFactory.setResources(new ByteArrayResource(value.getBytes(StandardCharsets.UTF_8)));
+ values.putAll(toMap(yamlFactory.getObject()));
+ } else if (key.endsWith(".properties")) {
+ Properties properties = new Properties();
+ try {
+ properties.load(new StringReader(value));
+ } catch (IOException e) {
+ LOGGER.error("load error");
+ }
+ values.putAll(toMap(properties));
+ } else {
+ values.put(key, value);
+ }
+ }
+ return values;
+ }
+
+ private List getValuesByPrefix(String prefix) {
+
+ CompletableFuture getFuture = client.getKVClient()
+ .get(ByteSequence.from(prefix, StandardCharsets.UTF_8),
+ GetOption.builder().withPrefix(ByteSequence.from(prefix, StandardCharsets.UTF_8)).build());
+ GetResponse response = MuteExceptionUtil.builder().withLog("get kv by prefix error")
+ .executeCompletableFuture(getFuture);
+ return response.getKvs();
+ }
+
+ private void refreshConfigItems() {
+ synchronized (lock) {
+ Map all = new HashMap<>();
+ all.putAll(environmentData);
+ all.putAll(applicationData);
+ all.putAll(serviceData);
+ all.putAll(versionData);
+ all.putAll(tagData);
+ updateHandler.handle(all, allLast);
+ this.allLast = all;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map toMap(Properties properties) {
+ if (properties == null) {
+ return Collections.emptyMap();
+ }
+ Map result = new HashMap<>();
+ Enumeration keys = (Enumeration) properties.propertyNames();
+ while (keys.hasMoreElements()) {
+ String key = keys.nextElement();
+ Object value = properties.getProperty(key);
+ result.put(key, value);
+ }
+ return result;
+ }
+}
diff --git a/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdConfig.java b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdConfig.java
new file mode 100644
index 00000000000..57713eed069
--- /dev/null
+++ b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.config.etcd;
+
+import org.springframework.core.env.Environment;
+
+public class EtcdConfig {
+ public static final String ZOOKEEPER_DEFAULT_ENVIRONMENT = "production";
+
+ public static final String PROPERTY_CONNECT_STRING = "servicecomb.config.etcd.connect-string";
+
+ public static final String PROPERTY_SESSION_TIMEOUT = "servicecomb.config.etcd.session-timeout-millis";
+
+ public static final String PROPERTY_CONNECTION_TIMEOUT = "servicecomb.config.etcd.connection-timeout-mills";
+
+ public static final String PROPERTY_AUTH_SCHEMA = "servicecomb.config.etcd.authentication-schema";
+
+ public static final String PROPERTY_AUTH_INFO = "servicecomb.config.etcd.authentication-info";
+
+ public static final String PROPERTY_INSTANCE_TAG = "servicecomb.config.etcd.instance-tag";
+
+ private final Environment environment;
+
+ public EtcdConfig(Environment environment) {
+ this.environment = environment;
+ }
+
+ public String getConnectString() {
+ return environment.getProperty(PROPERTY_CONNECT_STRING, "127.0.0.1:2181");
+ }
+
+ public int getSessionTimeoutMillis() {
+ return environment.getProperty(PROPERTY_SESSION_TIMEOUT, int.class, 60000);
+ }
+
+ public int getConnectionTimeoutMillis() {
+ return environment.getProperty(PROPERTY_CONNECTION_TIMEOUT, int.class, 1000);
+ }
+
+ public String getAuthSchema() {
+ return environment.getProperty(PROPERTY_AUTH_SCHEMA);
+ }
+
+ public String getAuthInfo() {
+ return environment.getProperty(PROPERTY_AUTH_INFO);
+ }
+
+ public String getInstanceTag() {
+ return environment.getProperty(PROPERTY_INSTANCE_TAG);
+ }
+}
diff --git a/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdDynamicPropertiesSource.java b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdDynamicPropertiesSource.java
new file mode 100644
index 00000000000..a86de892180
--- /dev/null
+++ b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdDynamicPropertiesSource.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.config.etcd;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.servicecomb.config.ConfigurationChangedEvent;
+import org.apache.servicecomb.config.DynamicPropertiesSource;
+import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.env.Environment;
+import org.springframework.core.env.MapPropertySource;
+import org.springframework.core.env.PropertySource;
+
+public class EtcdDynamicPropertiesSource implements DynamicPropertiesSource {
+ public static final String SOURCE_NAME = "etcd";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(EtcdDynamicPropertiesSource.class);
+
+ private final Map valueCache = new ConcurrentHashMap<>();
+
+ public EtcdDynamicPropertiesSource() {
+ }
+
+ private final UpdateHandler updateHandler = new UpdateHandler();
+
+ private void init(Environment environment) {
+ EtcdClient etcdClient = new EtcdClient(updateHandler, environment);
+ try {
+ etcdClient.refreshEtcdConfig();
+ } catch (Exception e) {
+ throw new IllegalStateException("Set up etcd config failed.", e);
+ }
+ }
+
+ public class UpdateHandler {
+ public void handle(Map current, Map last) {
+ ConfigurationChangedEvent event = ConfigurationChangedEvent.createIncremental(current, last);
+ LOGGER.info("Dynamic configuration changed: {}", event.getChanged());
+ valueCache.putAll(event.getAdded());
+ valueCache.putAll(event.getUpdated());
+ event.getDeleted().forEach((k, v) -> valueCache.remove(k));
+ EventManager.post(event);
+ }
+ }
+
+ @Override
+ public PropertySource> create(Environment environment) {
+ init(environment);
+ return new MapPropertySource(SOURCE_NAME, valueCache);
+ }
+
+ @Override
+ public int getOrder() {
+ return 0;
+ }
+}
diff --git a/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/MuteExceptionUtil.java b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/MuteExceptionUtil.java
new file mode 100644
index 00000000000..bc0e237b01f
--- /dev/null
+++ b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/MuteExceptionUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.config.etcd;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MuteExceptionUtil {
+
+ interface FunctionWithException {
+ R apply(T t) throws Exception;
+ }
+
+ interface FunctionWithDoubleParam {
+ R apply(T1 t1, T2 t2) throws Exception;
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MuteExceptionUtil.class);
+
+ public static class MuteExceptionUtilBuilder {
+
+ private String logMessage;
+
+ private Object[] customMessageParams;
+
+ public MuteExceptionUtilBuilder withLog(String message, Object... params) {
+ this.logMessage = message;
+ this.customMessageParams = params;
+ return this;
+ }
+
+ private String getLogMessage(String defaultMessage) {
+ return logMessage != null ? logMessage : defaultMessage;
+ }
+
+ public R executeFunction(FunctionWithException function, T t) {
+ try {
+ return function.apply(t);
+ } catch (Exception e) {
+ LOGGER.error(getLogMessage("execute Function failure..."), customMessageParams, e);
+ return null;
+ }
+ }
+
+ public T executeSupplier(Supplier supplier) {
+ try {
+ return supplier.get();
+ } catch (Exception e) {
+ LOGGER.error(getLogMessage("execute Supplier failure..."), customMessageParams, e);
+ return null;
+ }
+ }
+
+ public T executeCompletableFuture(CompletableFuture completableFuture) {
+ try {
+ return completableFuture.get();
+ } catch (Exception e) {
+ LOGGER.error(getLogMessage("execute CompletableFuture failure..."), customMessageParams, e);
+ return null;
+ }
+ }
+
+ public R executeFunctionWithDoubleParam(FunctionWithDoubleParam function, T1 t1, T2 t2) {
+ try {
+ return function.apply(t1, t2);
+ } catch (Exception e) {
+ LOGGER.error(getLogMessage("execute FunctionWithDoubleParam failure..."), customMessageParams, e);
+ return null;
+ }
+ }
+ }
+
+ public static MuteExceptionUtilBuilder builder() {
+ return new MuteExceptionUtilBuilder();
+ }
+}
diff --git a/dynamic-config/config-etcd/src/main/resources/META-INF/services/org.apache.servicecomb.config.DynamicPropertiesSource b/dynamic-config/config-etcd/src/main/resources/META-INF/services/org.apache.servicecomb.config.DynamicPropertiesSource
new file mode 100644
index 00000000000..d10124395fa
--- /dev/null
+++ b/dynamic-config/config-etcd/src/main/resources/META-INF/services/org.apache.servicecomb.config.DynamicPropertiesSource
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.servicecomb.config.etcd.EtcdDynamicPropertiesSource
diff --git a/dynamic-config/pom.xml b/dynamic-config/pom.xml
index 4189d8817f0..540381370f9 100644
--- a/dynamic-config/pom.xml
+++ b/dynamic-config/pom.xml
@@ -37,6 +37,7 @@
config-nacos
config-kie
config-zookeeper
+ config-etcd
diff --git a/foundations/foundation-common/pom.xml b/foundations/foundation-common/pom.xml
index c377ef211a9..3e07a454296 100644
--- a/foundations/foundation-common/pom.xml
+++ b/foundations/foundation-common/pom.xml
@@ -54,6 +54,12 @@
org.apache.httpcomponents
httpclient
+
+
+ commons-logging
+ commons-logging
+
+
jakarta.ws.rs
diff --git a/foundations/foundation-config/pom.xml b/foundations/foundation-config/pom.xml
index 6a3173f50b4..14629e95a32 100644
--- a/foundations/foundation-config/pom.xml
+++ b/foundations/foundation-config/pom.xml
@@ -74,6 +74,12 @@
org.apache.httpcomponents
httpclient
+
+
+ commons-logging
+ commons-logging
+
+
io.netty
diff --git a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/ConditionWaiter.java b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/ConditionWaiter.java
new file mode 100644
index 00000000000..f0626f14cd2
--- /dev/null
+++ b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/ConditionWaiter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.registry.etcd;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConditionWaiter {
+ private final AtomicReference dataReference;
+
+ private final AtomicBoolean isComplete;
+
+ private final long sleepDuration;
+
+ private final TimeUnit timeUnit;
+
+ private final ExecutorService executorService;
+
+ public ConditionWaiter(T initialData, long sleepDuration, TimeUnit timeUnit) {
+ this.dataReference = new AtomicReference<>(initialData);
+ this.isComplete = new AtomicBoolean(false);
+ this.sleepDuration = sleepDuration;
+ this.timeUnit = timeUnit;
+ this.executorService = Executors.newSingleThreadExecutor();
+ }
+
+ public T waitForCompletion() {
+ while (!isComplete.get()) {
+ SleepUtil.sleep(sleepDuration, timeUnit);
+ }
+ return dataReference.get();
+ }
+
+ public void setData(T newData) {
+ dataReference.set(newData);
+ }
+
+ public void executeTaskAsync(Callable task) {
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ throw new RuntimeException("Task execution failed", e);
+ }
+ }, executorService).thenAccept(result -> {
+ setData(result);
+ isComplete.set(true);
+ });
+ }
+
+ public static class SleepUtil {
+ public static void sleep(long duration, TimeUnit timeUnit) {
+ try {
+ timeUnit.sleep(duration);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ System.out.println("Thread was interrupted during sleep!");
+ }
+ }
+ }
+}
diff --git a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java
index 3f588826986..407bc1d7d81 100644
--- a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java
+++ b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java
@@ -18,9 +18,12 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -59,7 +62,6 @@ public class EtcdDiscovery implements Discovery {
private Map watchMap = new ConcurrentHashMapEx<>();
-
@Autowired
@SuppressWarnings("unused")
public void setEnvironment(Environment environment) {
@@ -99,8 +101,17 @@ public List findServiceInstances(String application, Stri
return watchClient;
});
- List endpointKv = getValuesByPrefix(prefixPath);
- return convertServiceInstanceList(endpointKv);
+// async get all instances,because sync is bad way in etcd.
+ ConditionWaiter> waiter = new ConditionWaiter<>(new ArrayList<>(), 50,
+ TimeUnit.MILLISECONDS);
+ waiter.executeTaskAsync(() -> {
+ CompletableFuture getFuture = client.getKVClient()
+ .get(ByteSequence.from(prefixPath, StandardCharsets.UTF_8),
+ GetOption.builder().withPrefix(ByteSequence.from(prefixPath, StandardCharsets.UTF_8)).build());
+ GetResponse getResponse = getFuture.get();
+ return convertServiceInstanceList(getResponse.getKvs());
+ });
+ return waiter.waitForCompletion();
}
private void watchNode(String application, String serviceName, String prefixPath) {
@@ -144,16 +155,27 @@ private static EtcdDiscoveryInstance getEtcdDiscoveryInstance(KeyValue keyValue)
.withLog("convert json value to obj from etcd failure, {}", valueJson)
.executeFunctionWithDoubleParam(JsonUtils::readValue, valueJson.getBytes(StandardCharsets.UTF_8),
EtcdInstance.class);
- EtcdDiscoveryInstance etcdDiscoveryInstance = new EtcdDiscoveryInstance(etcdInstance);
- return etcdDiscoveryInstance;
+ return new EtcdDiscoveryInstance(etcdInstance);
}
@Override
public List findServices(String application) {
- String prefixPath = basePath + "/" + application;
- List endpointKv = getValuesByPrefix(prefixPath);
- return endpointKv.stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)).collect(Collectors.toList());
+ ConditionWaiter> waiter = new ConditionWaiter<>(new ArrayList<>(), 50, TimeUnit.MILLISECONDS);
+ waiter.executeTaskAsync(() -> {
+ String prefixPath = basePath + "/" + application;
+ List endpointKv = getValuesByPrefix(prefixPath);
+ return endpointKv.stream()
+ .map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
+ .map(key -> {
+ String[] parts = StringUtils.split(key, "/");
+ return parts.length > 5 ? parts[4] : null;
+ })
+ .filter(Objects::nonNull)
+ .distinct()
+ .collect(Collectors.toList());
+ });
+ return waiter.waitForCompletion();
}
@Override
diff --git a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/MuteExceptionUtil.java b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/MuteExceptionUtil.java
index 83041cacfbc..eea59100450 100644
--- a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/MuteExceptionUtil.java
+++ b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/MuteExceptionUtil.java
@@ -50,7 +50,6 @@ private String getLogMessage(String defaultMessage) {
return logMessage != null ? logMessage : defaultMessage;
}
- // 执行带异常处理的Function
public R executeFunction(FunctionWithException function, T t) {
try {
return function.apply(t);