diff --git a/demo/demo-etcd/gateway/pom.xml b/demo/demo-etcd/gateway/pom.xml index b2eb1e51df2..9bed75de856 100644 --- a/demo/demo-etcd/gateway/pom.xml +++ b/demo/demo-etcd/gateway/pom.xml @@ -41,7 +41,7 @@ com.google.protobuf protobuf-java - 3.25.3 + 3.25.5 runtime diff --git a/demo/demo-etcd/provider/pom.xml b/demo/demo-etcd/provider/pom.xml index c1413de54ce..cc926bda1ce 100644 --- a/demo/demo-etcd/provider/pom.xml +++ b/demo/demo-etcd/provider/pom.xml @@ -41,13 +41,17 @@ com.google.protobuf protobuf-java - 3.25.3 + 3.25.5 runtime org.apache.servicecomb registry-etcd + + org.apache.servicecomb + config-etcd + io.reactivex.rxjava3 diff --git a/demo/demo-etcd/provider/src/main/resources/application.yml b/demo/demo-etcd/provider/src/main/resources/application.yml index 75b6dee9ea6..da2051cd77e 100644 --- a/demo/demo-etcd/provider/src/main/resources/application.yml +++ b/demo/demo-etcd/provider/src/main/resources/application.yml @@ -26,6 +26,10 @@ servicecomb: registry: etcd: connectString: http://127.0.0.1:2379 + config: + etcd: + connectString: http://127.0.0.1:2379 + instance-tag: tag1 rest: address: 0.0.0.0:9094?websocketEnabled=true @@ -39,6 +43,13 @@ servicecomb: allowedMethod: "*" maxAge: 3600 + key1: 1 key2: 3 key3: 5 + +test1: env +test2: applition +test3: service +test4: version +test5: tag diff --git a/demo/demo-etcd/test-client/pom.xml b/demo/demo-etcd/test-client/pom.xml index 0c050adcb85..d14d1b1c45f 100644 --- a/demo/demo-etcd/test-client/pom.xml +++ b/demo/demo-etcd/test-client/pom.xml @@ -46,6 +46,16 @@ org.apache.servicecomb registry-local + + io.etcd + jetcd-core + + + com.google.protobuf + protobuf-java + 3.25.5 + runtime + @@ -60,7 +70,7 @@ - bitnami/etcd:latest + bitnami/etcd:3.5.16 etcd alias @@ -74,7 +84,7 @@ - etcd.port:2379 + 2379:2379 yes diff --git a/demo/demo-etcd/test-client/src/main/java/org/apache/servicecomb/samples/EtcdConfigIT.java b/demo/demo-etcd/test-client/src/main/java/org/apache/servicecomb/samples/EtcdConfigIT.java new file mode 100644 index 00000000000..75b0c3fdaad --- /dev/null +++ b/demo/demo-etcd/test-client/src/main/java/org/apache/servicecomb/samples/EtcdConfigIT.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.samples; + +import java.nio.charset.StandardCharsets; + +import org.apache.servicecomb.demo.CategorizedTestCase; +import org.apache.servicecomb.demo.TestMgr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestOperations; +import org.springframework.web.client.RestTemplate; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; + +@Component +public class EtcdConfigIT implements CategorizedTestCase { + RestOperations template = new RestTemplate(); + + private static final Logger LOGGER = LoggerFactory.getLogger(EtcdConfigIT.class); + + @Override + public void testRestTransport() throws Exception { + + testEnvironment(); + testApplication(); + testService(); + testVersion(); + testTag(); + testOverride(); + } + + private void testOverride() { + + putValue("/servicecomb/config/environment/production/application2.properties", + "testValue=t1"); + putValue("/servicecomb/config/application/production/demo-etcd/application2.properties", + "testValue=t2"); + testGetConfig("testValue", "t2"); + putValue("/servicecomb/config/service/production/demo-etcd/provider/application2.properties", + "testValue=t3"); + testGetConfig("testValue", "t3"); + putValue("/servicecomb/config/version/production/demo-etcd/provider/0.0.1/application2.properties", + "testValue=t4"); + testGetConfig("testValue", "t4"); + putValue("/servicecomb/config/tag/production/demo-etcd/provider/0.0.1/tag1/application2.properties", + "testValue=t5"); + testGetConfig("testValue", "t5"); + } + + private void testEnvironment() { + + putValue("/servicecomb/config/environment/production/application.properties", + "test1=env"); + putValue("/servicecomb/config/environment/production/application.properties", + "test1=env1"); + testGetConfig("test1", "env1"); + } + + private void testApplication() { + + putValue("/servicecomb/config/application/production/demo-etcd/application.properties", + "test2=applition"); + putValue("/servicecomb/config/application/production/demo-etcd/application.properties", + "test2=applition2"); + testGetConfig("test2", "applition2"); + } + + private void testService() { + + putValue("/servicecomb/config/service/production/demo-etcd/provider/application.properties", + "test3=service"); + putValue("/servicecomb/config/service/production/demo-etcd/provider/application.properties", + "test3=service3"); + testGetConfig("test3", "service3"); + } + + private void testVersion() { + + putValue("/servicecomb/config/version/production/demo-etcd/provider/0.0.1/application.properties", + "test3=version"); + putValue("/servicecomb/config/version/production/demo-etcd/provider/0.0.1/application.properties", + "test4=version4"); + testGetConfig("test4", "version4"); + } + + private void testTag() { + + putValue("/servicecomb/config/tag/production/demo-etcd/provider/0.0.1/tag1/application.properties", + "test5=tag"); + putValue("/servicecomb/config/tag/production/demo-etcd/provider/0.0.1/tag1/application.properties", + "test5=tag5"); + testGetConfig("test5", "tag5"); + } + + + public void putValue(String key, String value) { + try (Client client = Client.builder().endpoints("http://localhost:2379").build()) { + + client.getKVClient().put( + ByteSequence.from(key, StandardCharsets.UTF_8), + ByteSequence.from(value, StandardCharsets.UTF_8) + ).get(); + + LOGGER.info("Value set successfully"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void testGetConfig(String key, String expectValue) { + + String result = template + .getForObject(Config.GATEWAY_URL + "/getConfig?key=" + key, String.class); + TestMgr.check(expectValue, result); + } +} diff --git a/demo/demo-etcd/test-client/src/main/resources/application.yml b/demo/demo-etcd/test-client/src/main/resources/application.yml index 396bebfd853..4245a4d2c86 100644 --- a/demo/demo-etcd/test-client/src/main/resources/application.yml +++ b/demo/demo-etcd/test-client/src/main/resources/application.yml @@ -23,3 +23,15 @@ servicecomb: rest: address: 0.0.0.0:9097 # should be same with server.port to use web container + + config: + etcd: + instance-tag: tag1 + +test1: env +test2: applition +test3: service +test4: version +test5: tag + + diff --git a/dependencies/bom/pom.xml b/dependencies/bom/pom.xml index f2218783fb2..16b2dea9845 100644 --- a/dependencies/bom/pom.xml +++ b/dependencies/bom/pom.xml @@ -283,6 +283,11 @@ registry-etcd ${project.version} + + org.apache.servicecomb + config-etcd + ${project.version} + org.apache.servicecomb diff --git a/dynamic-config/config-etcd/pom.xml b/dynamic-config/config-etcd/pom.xml new file mode 100644 index 00000000000..a1c35d9872f --- /dev/null +++ b/dynamic-config/config-etcd/pom.xml @@ -0,0 +1,44 @@ + + + + + + + dynamic-config + org.apache.servicecomb + 3.3.0-SNAPSHOT + + 4.0.0 + config-etcd + Java Chassis::Dynamic Config::Zookeeper + + + org.apache.servicecomb + foundation-config + + + org.apache.servicecomb + foundation-vertx + + + io.etcd + jetcd-core + + + diff --git a/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdClient.java b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdClient.java new file mode 100644 index 00000000000..f3088cb0c7e --- /dev/null +++ b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdClient.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.config.etcd; + +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import org.apache.commons.lang3.StringUtils; +import org.apache.servicecomb.config.BootStrapProperties; +import org.apache.servicecomb.config.etcd.EtcdDynamicPropertiesSource.UpdateHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.config.YamlPropertiesFactoryBean; +import org.springframework.core.env.Environment; +import org.springframework.core.io.ByteArrayResource; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.Watch; +import io.etcd.jetcd.kv.GetResponse; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.WatchOption; + +public class EtcdClient { + + public class GetDataRunable implements Runnable { + + private Map dataMap; + + private EtcdClient etcdClient; + + private String path; + + public GetDataRunable(Map dataMap, EtcdClient etcdClient, String path) { + this.dataMap = dataMap; + this.etcdClient = etcdClient; + this.path = path; + } + + @Override + public void run() { + try { + dataMap.clear(); + dataMap.putAll(etcdClient.parseData(path)); + refreshConfigItems(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private static final Logger LOGGER = LoggerFactory.getLogger(EtcdClient.class); + + public static final String PATH_ENVIRONMENT = "/servicecomb/config/environment/%s"; + + public static final String PATH_APPLICATION = "/servicecomb/config/application/%s/%s"; + + public static final String PATH_SERVICE = "/servicecomb/config/service/%s/%s/%s"; + + public static final String PATH_VERSION = "/servicecomb/config/version/%s/%s/%s/%s"; + + public static final String PATH_TAG = "/servicecomb/config/tag/%s/%s/%s/%s/%s"; + + private final UpdateHandler updateHandler; + + private final EtcdConfig etcdConfig; + + private final Environment environment; + + private final Object lock = new Object(); + + private Map environmentData = new HashMap<>(); + + private Map applicationData = new HashMap<>(); + + private Map serviceData = new HashMap<>(); + + private Map versionData = new HashMap<>(); + + private Map tagData = new HashMap<>(); + + private Map allLast = new HashMap<>(); + + private Client client; + + public EtcdClient(UpdateHandler updateHandler, Environment environment) { + this.updateHandler = updateHandler; + this.etcdConfig = new EtcdConfig(environment); + this.environment = environment; + } + + public void getClient() { + if (StringUtils.isEmpty(etcdConfig.getAuthInfo())) { + this.client = Client.builder().endpoints(etcdConfig.getConnectString()).build(); + } else { + String[] authInfo = etcdConfig.getAuthInfo().split(":"); + this.client = Client.builder().endpoints(etcdConfig.getConnectString()) + .user(ByteSequence.from(authInfo[0], StandardCharsets.UTF_8)) + .password(ByteSequence.from(authInfo[1], StandardCharsets.UTF_8)).build(); + } + } + + public void refreshEtcdConfig() throws Exception { + + getClient(); + String env = BootStrapProperties.readServiceEnvironment(environment); + if (StringUtils.isEmpty(env)) { + env = EtcdConfig.ZOOKEEPER_DEFAULT_ENVIRONMENT; + } + addEnvironmentConfig(env); + addApplicationConfig(env); + addServiceConfig(env); + addVersionConfig(env); + addTagConfig(env); + + refreshConfigItems(); + } + + private void addTagConfig(String env) throws Exception { + if (StringUtils.isEmpty(etcdConfig.getInstanceTag())) { + return; + } + String path = String.format(PATH_TAG, env, + BootStrapProperties.readApplication(environment), + BootStrapProperties.readServiceName(environment), + BootStrapProperties.readServiceVersion(environment), + etcdConfig.getInstanceTag()); + + ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8); + Watch watchClient = client.getWatchClient(); + watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(), + resp -> new Thread(new GetDataRunable(tagData, this, path)).start()); + this.tagData = parseData(path); + } + + private void addVersionConfig(String env) throws Exception { + String path = String.format(PATH_VERSION, env, + BootStrapProperties.readApplication(environment), + BootStrapProperties.readServiceName(environment), + BootStrapProperties.readServiceVersion(environment)); + + ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8); + Watch watchClient = client.getWatchClient(); + watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(), + resp -> new Thread(new GetDataRunable(versionData, this, path)).start()); + this.versionData = parseData(path); + } + + private void addServiceConfig(String env) throws Exception { + String path = String.format(PATH_SERVICE, env, + BootStrapProperties.readApplication(environment), + BootStrapProperties.readServiceName(environment)); + + ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8); + Watch watchClient = client.getWatchClient(); + watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(), + resp -> new Thread(new GetDataRunable(serviceData, this, path)).start()); + this.serviceData = parseData(path); + } + + private void addApplicationConfig(String env) throws Exception { + String path = String.format(PATH_APPLICATION, env, BootStrapProperties.readApplication(environment)); + + ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8); + Watch watchClient = client.getWatchClient(); + watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(), + resp -> new Thread(new GetDataRunable(applicationData, this, path)).start()); + this.applicationData = parseData(path); + } + + private void addEnvironmentConfig(String env) throws Exception { + String path = String.format(PATH_ENVIRONMENT, env); + + ByteSequence prefixByteSeq = ByteSequence.from(path, StandardCharsets.UTF_8); + Watch watchClient = client.getWatchClient(); + watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(), + resp -> new Thread(new GetDataRunable(environmentData, this, path)).start()); + this.environmentData = parseData(path); + } + + public Map parseData(String path) throws Exception { + + List endpointKv = getValuesByPrefix(path); + return getValues(path, endpointKv); + } + + private Map getValues(String path, List endpointKv) { + Map values = new HashMap<>(); + for (KeyValue keyValue : endpointKv) { + String key = new String(keyValue.getKey().getBytes(), StandardCharsets.UTF_8); + String value = new String(keyValue.getValue().getBytes(), StandardCharsets.UTF_8); + if (key.equals(path)) { + continue; + } + if (key.endsWith(".yaml") || key.endsWith(".yml")) { + YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean(); + yamlFactory.setResources(new ByteArrayResource(value.getBytes(StandardCharsets.UTF_8))); + values.putAll(toMap(yamlFactory.getObject())); + } else if (key.endsWith(".properties")) { + Properties properties = new Properties(); + try { + properties.load(new StringReader(value)); + } catch (IOException e) { + LOGGER.error("load error"); + } + values.putAll(toMap(properties)); + } else { + values.put(key, value); + } + } + return values; + } + + private List getValuesByPrefix(String prefix) { + + CompletableFuture getFuture = client.getKVClient() + .get(ByteSequence.from(prefix, StandardCharsets.UTF_8), + GetOption.builder().withPrefix(ByteSequence.from(prefix, StandardCharsets.UTF_8)).build()); + GetResponse response = MuteExceptionUtil.builder().withLog("get kv by prefix error") + .executeCompletableFuture(getFuture); + return response.getKvs(); + } + + private void refreshConfigItems() { + synchronized (lock) { + Map all = new HashMap<>(); + all.putAll(environmentData); + all.putAll(applicationData); + all.putAll(serviceData); + all.putAll(versionData); + all.putAll(tagData); + updateHandler.handle(all, allLast); + this.allLast = all; + } + } + + @SuppressWarnings("unchecked") + private Map toMap(Properties properties) { + if (properties == null) { + return Collections.emptyMap(); + } + Map result = new HashMap<>(); + Enumeration keys = (Enumeration) properties.propertyNames(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + Object value = properties.getProperty(key); + result.put(key, value); + } + return result; + } +} diff --git a/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdConfig.java b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdConfig.java new file mode 100644 index 00000000000..57713eed069 --- /dev/null +++ b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdConfig.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.config.etcd; + +import org.springframework.core.env.Environment; + +public class EtcdConfig { + public static final String ZOOKEEPER_DEFAULT_ENVIRONMENT = "production"; + + public static final String PROPERTY_CONNECT_STRING = "servicecomb.config.etcd.connect-string"; + + public static final String PROPERTY_SESSION_TIMEOUT = "servicecomb.config.etcd.session-timeout-millis"; + + public static final String PROPERTY_CONNECTION_TIMEOUT = "servicecomb.config.etcd.connection-timeout-mills"; + + public static final String PROPERTY_AUTH_SCHEMA = "servicecomb.config.etcd.authentication-schema"; + + public static final String PROPERTY_AUTH_INFO = "servicecomb.config.etcd.authentication-info"; + + public static final String PROPERTY_INSTANCE_TAG = "servicecomb.config.etcd.instance-tag"; + + private final Environment environment; + + public EtcdConfig(Environment environment) { + this.environment = environment; + } + + public String getConnectString() { + return environment.getProperty(PROPERTY_CONNECT_STRING, "127.0.0.1:2181"); + } + + public int getSessionTimeoutMillis() { + return environment.getProperty(PROPERTY_SESSION_TIMEOUT, int.class, 60000); + } + + public int getConnectionTimeoutMillis() { + return environment.getProperty(PROPERTY_CONNECTION_TIMEOUT, int.class, 1000); + } + + public String getAuthSchema() { + return environment.getProperty(PROPERTY_AUTH_SCHEMA); + } + + public String getAuthInfo() { + return environment.getProperty(PROPERTY_AUTH_INFO); + } + + public String getInstanceTag() { + return environment.getProperty(PROPERTY_INSTANCE_TAG); + } +} diff --git a/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdDynamicPropertiesSource.java b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdDynamicPropertiesSource.java new file mode 100644 index 00000000000..a86de892180 --- /dev/null +++ b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/EtcdDynamicPropertiesSource.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.config.etcd; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.servicecomb.config.ConfigurationChangedEvent; +import org.apache.servicecomb.config.DynamicPropertiesSource; +import org.apache.servicecomb.foundation.common.event.EventManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.env.Environment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.PropertySource; + +public class EtcdDynamicPropertiesSource implements DynamicPropertiesSource { + public static final String SOURCE_NAME = "etcd"; + + private static final Logger LOGGER = LoggerFactory.getLogger(EtcdDynamicPropertiesSource.class); + + private final Map valueCache = new ConcurrentHashMap<>(); + + public EtcdDynamicPropertiesSource() { + } + + private final UpdateHandler updateHandler = new UpdateHandler(); + + private void init(Environment environment) { + EtcdClient etcdClient = new EtcdClient(updateHandler, environment); + try { + etcdClient.refreshEtcdConfig(); + } catch (Exception e) { + throw new IllegalStateException("Set up etcd config failed.", e); + } + } + + public class UpdateHandler { + public void handle(Map current, Map last) { + ConfigurationChangedEvent event = ConfigurationChangedEvent.createIncremental(current, last); + LOGGER.info("Dynamic configuration changed: {}", event.getChanged()); + valueCache.putAll(event.getAdded()); + valueCache.putAll(event.getUpdated()); + event.getDeleted().forEach((k, v) -> valueCache.remove(k)); + EventManager.post(event); + } + } + + @Override + public PropertySource create(Environment environment) { + init(environment); + return new MapPropertySource(SOURCE_NAME, valueCache); + } + + @Override + public int getOrder() { + return 0; + } +} diff --git a/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/MuteExceptionUtil.java b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/MuteExceptionUtil.java new file mode 100644 index 00000000000..bc0e237b01f --- /dev/null +++ b/dynamic-config/config-etcd/src/main/java/org/apache/servicecomb/config/etcd/MuteExceptionUtil.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.config.etcd; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MuteExceptionUtil { + + interface FunctionWithException { + R apply(T t) throws Exception; + } + + interface FunctionWithDoubleParam { + R apply(T1 t1, T2 t2) throws Exception; + } + + private static final Logger LOGGER = LoggerFactory.getLogger(MuteExceptionUtil.class); + + public static class MuteExceptionUtilBuilder { + + private String logMessage; + + private Object[] customMessageParams; + + public MuteExceptionUtilBuilder withLog(String message, Object... params) { + this.logMessage = message; + this.customMessageParams = params; + return this; + } + + private String getLogMessage(String defaultMessage) { + return logMessage != null ? logMessage : defaultMessage; + } + + public R executeFunction(FunctionWithException function, T t) { + try { + return function.apply(t); + } catch (Exception e) { + LOGGER.error(getLogMessage("execute Function failure..."), customMessageParams, e); + return null; + } + } + + public T executeSupplier(Supplier supplier) { + try { + return supplier.get(); + } catch (Exception e) { + LOGGER.error(getLogMessage("execute Supplier failure..."), customMessageParams, e); + return null; + } + } + + public T executeCompletableFuture(CompletableFuture completableFuture) { + try { + return completableFuture.get(); + } catch (Exception e) { + LOGGER.error(getLogMessage("execute CompletableFuture failure..."), customMessageParams, e); + return null; + } + } + + public R executeFunctionWithDoubleParam(FunctionWithDoubleParam function, T1 t1, T2 t2) { + try { + return function.apply(t1, t2); + } catch (Exception e) { + LOGGER.error(getLogMessage("execute FunctionWithDoubleParam failure..."), customMessageParams, e); + return null; + } + } + } + + public static MuteExceptionUtilBuilder builder() { + return new MuteExceptionUtilBuilder(); + } +} diff --git a/dynamic-config/config-etcd/src/main/resources/META-INF/services/org.apache.servicecomb.config.DynamicPropertiesSource b/dynamic-config/config-etcd/src/main/resources/META-INF/services/org.apache.servicecomb.config.DynamicPropertiesSource new file mode 100644 index 00000000000..d10124395fa --- /dev/null +++ b/dynamic-config/config-etcd/src/main/resources/META-INF/services/org.apache.servicecomb.config.DynamicPropertiesSource @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.servicecomb.config.etcd.EtcdDynamicPropertiesSource diff --git a/dynamic-config/pom.xml b/dynamic-config/pom.xml index 4189d8817f0..540381370f9 100644 --- a/dynamic-config/pom.xml +++ b/dynamic-config/pom.xml @@ -37,6 +37,7 @@ config-nacos config-kie config-zookeeper + config-etcd diff --git a/foundations/foundation-common/pom.xml b/foundations/foundation-common/pom.xml index c377ef211a9..3e07a454296 100644 --- a/foundations/foundation-common/pom.xml +++ b/foundations/foundation-common/pom.xml @@ -54,6 +54,12 @@ org.apache.httpcomponents httpclient + + + commons-logging + commons-logging + + jakarta.ws.rs diff --git a/foundations/foundation-config/pom.xml b/foundations/foundation-config/pom.xml index 6a3173f50b4..14629e95a32 100644 --- a/foundations/foundation-config/pom.xml +++ b/foundations/foundation-config/pom.xml @@ -74,6 +74,12 @@ org.apache.httpcomponents httpclient + + + commons-logging + commons-logging + + io.netty diff --git a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/ConditionWaiter.java b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/ConditionWaiter.java new file mode 100644 index 00000000000..f0626f14cd2 --- /dev/null +++ b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/ConditionWaiter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.registry.etcd; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class ConditionWaiter { + private final AtomicReference dataReference; + + private final AtomicBoolean isComplete; + + private final long sleepDuration; + + private final TimeUnit timeUnit; + + private final ExecutorService executorService; + + public ConditionWaiter(T initialData, long sleepDuration, TimeUnit timeUnit) { + this.dataReference = new AtomicReference<>(initialData); + this.isComplete = new AtomicBoolean(false); + this.sleepDuration = sleepDuration; + this.timeUnit = timeUnit; + this.executorService = Executors.newSingleThreadExecutor(); + } + + public T waitForCompletion() { + while (!isComplete.get()) { + SleepUtil.sleep(sleepDuration, timeUnit); + } + return dataReference.get(); + } + + public void setData(T newData) { + dataReference.set(newData); + } + + public void executeTaskAsync(Callable task) { + CompletableFuture.supplyAsync(() -> { + try { + return task.call(); + } catch (Exception e) { + throw new RuntimeException("Task execution failed", e); + } + }, executorService).thenAccept(result -> { + setData(result); + isComplete.set(true); + }); + } + + public static class SleepUtil { + public static void sleep(long duration, TimeUnit timeUnit) { + try { + timeUnit.sleep(duration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.out.println("Thread was interrupted during sleep!"); + } + } + } +} diff --git a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java index 3f588826986..407bc1d7d81 100644 --- a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java +++ b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java @@ -18,9 +18,12 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -59,7 +62,6 @@ public class EtcdDiscovery implements Discovery { private Map watchMap = new ConcurrentHashMapEx<>(); - @Autowired @SuppressWarnings("unused") public void setEnvironment(Environment environment) { @@ -99,8 +101,17 @@ public List findServiceInstances(String application, Stri return watchClient; }); - List endpointKv = getValuesByPrefix(prefixPath); - return convertServiceInstanceList(endpointKv); +// async get all instances,because sync is bad way in etcd. + ConditionWaiter> waiter = new ConditionWaiter<>(new ArrayList<>(), 50, + TimeUnit.MILLISECONDS); + waiter.executeTaskAsync(() -> { + CompletableFuture getFuture = client.getKVClient() + .get(ByteSequence.from(prefixPath, StandardCharsets.UTF_8), + GetOption.builder().withPrefix(ByteSequence.from(prefixPath, StandardCharsets.UTF_8)).build()); + GetResponse getResponse = getFuture.get(); + return convertServiceInstanceList(getResponse.getKvs()); + }); + return waiter.waitForCompletion(); } private void watchNode(String application, String serviceName, String prefixPath) { @@ -144,16 +155,27 @@ private static EtcdDiscoveryInstance getEtcdDiscoveryInstance(KeyValue keyValue) .withLog("convert json value to obj from etcd failure, {}", valueJson) .executeFunctionWithDoubleParam(JsonUtils::readValue, valueJson.getBytes(StandardCharsets.UTF_8), EtcdInstance.class); - EtcdDiscoveryInstance etcdDiscoveryInstance = new EtcdDiscoveryInstance(etcdInstance); - return etcdDiscoveryInstance; + return new EtcdDiscoveryInstance(etcdInstance); } @Override public List findServices(String application) { - String prefixPath = basePath + "/" + application; - List endpointKv = getValuesByPrefix(prefixPath); - return endpointKv.stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)).collect(Collectors.toList()); + ConditionWaiter> waiter = new ConditionWaiter<>(new ArrayList<>(), 50, TimeUnit.MILLISECONDS); + waiter.executeTaskAsync(() -> { + String prefixPath = basePath + "/" + application; + List endpointKv = getValuesByPrefix(prefixPath); + return endpointKv.stream() + .map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)) + .map(key -> { + String[] parts = StringUtils.split(key, "/"); + return parts.length > 5 ? parts[4] : null; + }) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + }); + return waiter.waitForCompletion(); } @Override diff --git a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/MuteExceptionUtil.java b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/MuteExceptionUtil.java index 83041cacfbc..eea59100450 100644 --- a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/MuteExceptionUtil.java +++ b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/MuteExceptionUtil.java @@ -50,7 +50,6 @@ private String getLogMessage(String defaultMessage) { return logMessage != null ? logMessage : defaultMessage; } - // 执行带异常处理的Function public R executeFunction(FunctionWithException function, T t) { try { return function.apply(t);