diff --git a/clients/config-center-client/src/main/java/org/apache/servicecomb/config/center/client/ConfigCenterAddressManager.java b/clients/config-center-client/src/main/java/org/apache/servicecomb/config/center/client/ConfigCenterAddressManager.java index 3b346153a0b..62dcb7f8da9 100644 --- a/clients/config-center-client/src/main/java/org/apache/servicecomb/config/center/client/ConfigCenterAddressManager.java +++ b/clients/config-center-client/src/main/java/org/apache/servicecomb/config/center/client/ConfigCenterAddressManager.java @@ -28,8 +28,9 @@ public class ConfigCenterAddressManager extends AbstractAddressManager { - public ConfigCenterAddressManager(String projectName, List addresses, EventBus eventBus) { - super(projectName, addresses); + public ConfigCenterAddressManager(String projectName, List addresses, String ownRegion, + String ownAvailableZone, EventBus eventBus) { + super(projectName, addresses, ownRegion, ownAvailableZone); eventBus.register(this); } diff --git a/clients/config-center-client/src/main/java/org/apache/servicecomb/config/center/client/ConfigCenterClient.java b/clients/config-center-client/src/main/java/org/apache/servicecomb/config/center/client/ConfigCenterClient.java index 7763f5ec64a..1b6c4e70fc4 100644 --- a/clients/config-center-client/src/main/java/org/apache/servicecomb/config/center/client/ConfigCenterClient.java +++ b/clients/config-center-client/src/main/java/org/apache/servicecomb/config/center/client/ConfigCenterClient.java @@ -32,6 +32,7 @@ import org.apache.servicecomb.http.client.common.HttpResponse; import org.apache.servicecomb.http.client.common.HttpTransport; import org.apache.servicecomb.http.client.common.HttpUtils; +import org.apache.servicecomb.http.client.event.OperationEvents.UnAuthorizedOperationEvent; import org.apache.servicecomb.http.client.utils.ServiceCombServiceAvailableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,12 +62,15 @@ public class ConfigCenterClient implements ConfigCenterOperation { private final Map> dimensionConfigNames = new HashMap<>(); + private EventBus eventBus; + public ConfigCenterClient(ConfigCenterAddressManager addressManager, HttpTransport httpTransport) { this.addressManager = addressManager; this.httpTransport = httpTransport; } public void setEventBus(EventBus eventBus) { + this.eventBus = eventBus; addressManager.setEventBus(eventBus); } @@ -88,6 +92,7 @@ public QueryConfigurationsResponse queryConfigurations(QueryConfigurationsReques HttpRequest.GET); HttpResponse httpResponse = httpTransport.doRequest(httpRequest); + recordAndSendUnAuthorizedEvent(httpResponse, address); if (httpResponse.getStatusCode() == HttpStatus.SC_OK) { Map> allConfigMap = HttpUtils.deserialize( httpResponse.getContent(), @@ -121,21 +126,17 @@ public QueryConfigurationsResponse queryConfigurations(QueryConfigurationsReques } queryConfigurationsResponse.setConfigurations(configurations); queryConfigurationsResponse.setChanged(true); - addressManager.recordSuccessState(address); return queryConfigurationsResponse; } else if (httpResponse.getStatusCode() == HttpStatus.SC_NOT_MODIFIED) { queryConfigurationsResponse.setChanged(false); - addressManager.recordSuccessState(address); return queryConfigurationsResponse; } else if (httpResponse.getStatusCode() == HttpStatus.SC_TOO_MANY_REQUESTS) { LOGGER.warn("rate limited, keep the local dimension [{}] configs unchanged.", dimensionsInfo); queryConfigurationsResponse.setChanged(false); - addressManager.recordSuccessState(address); return queryConfigurationsResponse; } else if (httpResponse.getStatusCode() == HttpStatus.SC_BAD_REQUEST) { throw new OperationException("Bad request for query configurations."); } else { - addressManager.recordFailState(address); throw new OperationException( "read response failed. status:" + httpResponse.getStatusCode() @@ -151,6 +152,16 @@ public QueryConfigurationsResponse queryConfigurations(QueryConfigurationsReques } } + private void recordAndSendUnAuthorizedEvent(HttpResponse response, String address) { + if (this.eventBus != null && response.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) { + LOGGER.warn("query configuration unauthorized from server [{}], message [{}]", address, response.getMessage()); + addressManager.recordFailState(address); + this.eventBus.post(new UnAuthorizedOperationEvent(address)); + } else { + addressManager.recordSuccessState(address); + } + } + /** * Only the name of the new configuration item is printed. * No log is printed when the configuration content is updated. diff --git a/clients/config-kie-client/src/main/java/org/apache/servicecomb/config/kie/client/KieClient.java b/clients/config-kie-client/src/main/java/org/apache/servicecomb/config/kie/client/KieClient.java index 7d6cd6548c2..175baadfbbf 100644 --- a/clients/config-kie-client/src/main/java/org/apache/servicecomb/config/kie/client/KieClient.java +++ b/clients/config-kie-client/src/main/java/org/apache/servicecomb/config/kie/client/KieClient.java @@ -46,6 +46,7 @@ import org.apache.servicecomb.http.client.common.HttpResponse; import org.apache.servicecomb.http.client.common.HttpTransport; import org.apache.servicecomb.http.client.common.HttpUtils; +import org.apache.servicecomb.http.client.event.OperationEvents.UnAuthorizedOperationEvent; import org.apache.servicecomb.http.client.utils.ServiceCombServiceAvailableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +72,8 @@ public class KieClient implements KieConfigOperation { private final Map> dimensionConfigNames = new HashMap<>(); + private EventBus eventBus; + public KieClient(KieAddressManager addressManager, HttpTransport httpTransport, KieConfiguration kieConfiguration) { this.httpTransport = httpTransport; this.addressManager = addressManager; @@ -78,6 +81,7 @@ public KieClient(KieAddressManager addressManager, HttpTransport httpTransport, } public void setEventBus(EventBus eventBus) { + this.eventBus = eventBus; addressManager.setEventBus(eventBus); } @@ -91,6 +95,7 @@ public ConfigurationsResponse queryConfigurations(ConfigurationsRequest request, HttpRequest httpRequest = new HttpRequest(url, null, null, HttpRequest.GET); HttpResponse httpResponse = httpTransport.doRequest(httpRequest); + recordAndSendUnAuthorizedEvent(httpResponse, address); ConfigurationsResponse configurationsResponse = new ConfigurationsResponse(); if (httpResponse.getStatusCode() == HttpStatus.SC_OK) { revision = httpResponse.getHeader("X-Kie-Revision"); @@ -100,7 +105,6 @@ public ConfigurationsResponse queryConfigurations(ConfigurationsRequest request, configurationsResponse.setConfigurations(configurations); configurationsResponse.setChanged(true); configurationsResponse.setRevision(revision); - addressManager.recordSuccessState(address); return configurationsResponse; } if (httpResponse.getStatusCode() == HttpStatus.SC_BAD_REQUEST) { @@ -108,16 +112,13 @@ public ConfigurationsResponse queryConfigurations(ConfigurationsRequest request, } if (httpResponse.getStatusCode() == HttpStatus.SC_NOT_MODIFIED) { configurationsResponse.setChanged(false); - addressManager.recordSuccessState(address); return configurationsResponse; } if (httpResponse.getStatusCode() == HttpStatus.SC_TOO_MANY_REQUESTS) { LOGGER.warn("rate limited, keep the local dimension [{}] configs unchanged.", request.getLabelsQuery()); configurationsResponse.setChanged(false); - addressManager.recordSuccessState(address); return configurationsResponse; } - addressManager.recordFailState(address); throw new OperationException( "read response failed. status:" + httpResponse.getStatusCode() + "; message:" + httpResponse.getMessage() + "; content:" + httpResponse.getContent()); @@ -128,6 +129,16 @@ public ConfigurationsResponse queryConfigurations(ConfigurationsRequest request, } } + private void recordAndSendUnAuthorizedEvent(HttpResponse response, String address) { + if (this.eventBus != null && response.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) { + LOGGER.warn("query configuration unauthorized from server [{}], message [{}]", address, response.getMessage()); + addressManager.recordFailState(address); + this.eventBus.post(new UnAuthorizedOperationEvent(address)); + } else { + addressManager.recordSuccessState(address); + } + } + /** * Only the name of the new configuration item is printed. * No log is printed when the configuration content is updated. diff --git a/clients/config-kie-client/src/main/java/org/apache/servicecomb/config/kie/client/model/KieAddressManager.java b/clients/config-kie-client/src/main/java/org/apache/servicecomb/config/kie/client/model/KieAddressManager.java index 0742c11fb10..d8b6582d846 100644 --- a/clients/config-kie-client/src/main/java/org/apache/servicecomb/config/kie/client/model/KieAddressManager.java +++ b/clients/config-kie-client/src/main/java/org/apache/servicecomb/config/kie/client/model/KieAddressManager.java @@ -27,8 +27,8 @@ public class KieAddressManager extends AbstractAddressManager { - public KieAddressManager(List addresses, EventBus eventBus) { - super(addresses); + public KieAddressManager(List addresses, String ownRegion, String ownAvailableZone, EventBus eventBus) { + super(addresses, ownRegion, ownAvailableZone); eventBus.register(this); } diff --git a/clients/config-kie-client/src/test/java/org/apache/servicecomb/config/kie/client/model/KieAddressManagerTest.java b/clients/config-kie-client/src/test/java/org/apache/servicecomb/config/kie/client/model/KieAddressManagerTest.java index 6533fc60e57..4fd547a2ce2 100644 --- a/clients/config-kie-client/src/test/java/org/apache/servicecomb/config/kie/client/model/KieAddressManagerTest.java +++ b/clients/config-kie-client/src/test/java/org/apache/servicecomb/config/kie/client/model/KieAddressManagerTest.java @@ -40,7 +40,7 @@ class KieAddressManagerTest { public void kieAddressManagerTest() throws NoSuchFieldException, IllegalAccessException { addresses.add("http://127.0.0.1:30103"); addresses.add("https://127.0.0.2:30103"); - addressManager1 = new KieAddressManager(addresses, new EventBus()); + addressManager1 = new KieAddressManager(addresses, "", "", new EventBus()); Field addressManagerField = addressManager1.getClass().getSuperclass().getDeclaredField("index"); addressManagerField.setAccessible(true); addressManagerField.set(addressManager1, 0); @@ -64,7 +64,7 @@ public void onRefreshEndpointEvent() { Map> zoneAndRegion = new HashMap<>(); zoneAndRegion.put("sameZone", addressAZ); zoneAndRegion.put("sameRegion", addressRG); - addressManager1 = new KieAddressManager(addresses, new EventBus()); + addressManager1 = new KieAddressManager(addresses, "", "", new EventBus()); RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "KIE"); addressManager1.refreshEndpoint(event, "KIE"); diff --git a/clients/dashboard-client/src/main/java/org/apache/servicecomb/dashboard/client/DashboardAddressManager.java b/clients/dashboard-client/src/main/java/org/apache/servicecomb/dashboard/client/DashboardAddressManager.java index 9dd8c6a3448..e80c537bc46 100644 --- a/clients/dashboard-client/src/main/java/org/apache/servicecomb/dashboard/client/DashboardAddressManager.java +++ b/clients/dashboard-client/src/main/java/org/apache/servicecomb/dashboard/client/DashboardAddressManager.java @@ -29,8 +29,8 @@ public class DashboardAddressManager extends AbstractAddressManager { - public DashboardAddressManager(List addresses, EventBus eventBus) { - super(addresses); + public DashboardAddressManager(List addresses, String ownRegion, String ownAvailableZone, EventBus eventBus) { + super(addresses, ownRegion, ownAvailableZone); eventBus.register(this); } diff --git a/clients/dashboard-client/src/test/java/org/apache/servicecomb/dashboard/client/AddressManagerTest.java b/clients/dashboard-client/src/test/java/org/apache/servicecomb/dashboard/client/AddressManagerTest.java index ac0c08b3eb4..448f2d7170f 100644 --- a/clients/dashboard-client/src/test/java/org/apache/servicecomb/dashboard/client/AddressManagerTest.java +++ b/clients/dashboard-client/src/test/java/org/apache/servicecomb/dashboard/client/AddressManagerTest.java @@ -41,7 +41,7 @@ class AddressManagerTest { public void kieAddressManagerTest() throws IllegalAccessException, NoSuchFieldException { addresses.add("http://127.0.0.1:30103"); addresses.add("https://127.0.0.2:30103"); - addressManager1 = new DashboardAddressManager(addresses, new EventBus()); + addressManager1 = new DashboardAddressManager(addresses, "", "", new EventBus()); Field addressManagerField = addressManager1.getClass().getSuperclass().getDeclaredField("index"); addressManagerField.setAccessible(true); addressManagerField.set(addressManager1, 0); @@ -65,7 +65,7 @@ public void onRefreshEndpointEvent() { Map> zoneAndRegion = new HashMap<>(); zoneAndRegion.put("sameZone", addressAZ); zoneAndRegion.put("sameRegion", addressRG); - addressManager1 = new DashboardAddressManager(addresses, new EventBus()); + addressManager1 = new DashboardAddressManager(addresses, "", "", new EventBus()); RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "CseMonitoring"); addressManager1.refreshEndpoint(event, "CseMonitoring"); diff --git a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java index 7493aff4e69..508b523a0bc 100644 --- a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java +++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java @@ -17,18 +17,21 @@ package org.apache.servicecomb.http.client.common; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.http.client.event.EngineConnectChangedEvent; import org.apache.servicecomb.http.client.event.RefreshEndpointEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.eventbus.EventBus; @@ -42,6 +45,10 @@ public class AbstractAddressManager { private static final String V3_PREFIX = "/v3/"; + private static final String ZONE = "availableZone"; + + private static final String REGION = "region"; + private static final int ISOLATION_THRESHOLD = 3; private volatile List addresses = new ArrayList<>(); @@ -74,17 +81,58 @@ public class AbstractAddressManager { private EventBus eventBus; - public AbstractAddressManager(List addresses) { + public AbstractAddressManager(List addresses, String ownRegion, String ownAvailableZone) { this.projectName = DEFAULT_PROJECT; - this.addresses.addAll(addresses); - this.defaultAddress.addAll(addresses); + parseAndInitAddresses(addresses, ownRegion, ownAvailableZone, false); this.index = !addresses.isEmpty() ? getRandomIndex() : 0; } - public AbstractAddressManager(String projectName, List addresses) { + /** + * address support config with region/availableZone info, to enable engine affinity calls during startup + * address may be like: + * https://192.168.20.13:30110?region=region1&availableZone=az + * https://192.168.20.13:30100?region=region1&availableZone=az + * When address have no datacenter information, roundRobin using address + * + * @param addresses engine addresses + * @param ownRegion microservice region + * @param ownAvailableZone microservice zone + * @param isFormat is need format + */ + private void parseAndInitAddresses(List addresses, String ownRegion, String ownAvailableZone, + boolean isFormat) { + if (CollectionUtils.isEmpty(addresses)) { + return; + } + List tempList = new ArrayList<>(); + addressAutoRefreshed = addresses.stream().anyMatch(addr -> addr.contains(ZONE) || addr.contains(REGION)); + for (String address : addresses) { + // Compatible IpPortManager init address is 127.0.0.1:30100 + if (!address.startsWith("http")) { + tempList.add(address); + continue; + } + URLEndPoint endpoint = new URLEndPoint(address); + tempList.add(endpoint.toString()); + buildAffinityAddress(endpoint, ownRegion, ownAvailableZone); + } + this.addresses.addAll(isFormat ? this.transformAddress(tempList) : tempList); + this.defaultAddress.addAll(isFormat ? this.transformAddress(tempList) : tempList); + } + + private void buildAffinityAddress(URLEndPoint endpoint, String ownRegion, String ownAvailableZone) { + if (addressAutoRefreshed) { + if (regionAndAZMatch(ownRegion, ownAvailableZone, endpoint.getFirst(REGION), endpoint.getFirst(ZONE))) { + availableZone.add(endpoint.toString()); + } else { + availableRegion.add(endpoint.toString()); + } + } + } + + public AbstractAddressManager(String projectName, List addresses, String ownRegion, String ownAvailableZone) { this.projectName = StringUtils.isEmpty(projectName) ? DEFAULT_PROJECT : projectName; - this.addresses = this.transformAddress(addresses); - this.defaultAddress.addAll(addresses); + parseAndInitAddresses(addresses, ownRegion, ownAvailableZone, true); this.index = !addresses.isEmpty() ? getRandomIndex() : 0; } @@ -170,8 +218,9 @@ private String getAvailableZoneAddress() { return getCurrentAddress(zoneOrRegionAddress); } LOGGER.warn("all auto discovery addresses are isolation, please check server status."); + // when all available address are isolation, it will use config addresses for polling. - return getCurrentAddress(addresses); + return getDefaultAddress(); } private String getCurrentAddress(List addresses) { @@ -221,6 +270,11 @@ public void resetFailureStatus(String address) { addressFailureStatus.put(address, 0); } + /** + * Only authentication failure, IO, and timeout exception record as failed. + * + * @param address request address + */ public void recordFailState(String address) { synchronized (lock) { if (!addressFailureStatus.containsKey(address)) { @@ -271,4 +325,41 @@ public List getIsolationAddresses() { isolationAddresses.addAll(isolationRegionAddress); return isolationAddresses; } + + public String compareAndGetAddress(String host) { + for (String address : defaultAddress) { + if (isAddressHostSame(address, host)) { + return address; + } + } + return ""; + } + + private boolean isAddressHostSame(String address, String host) { + if (StringUtils.isEmpty(host)) { + return false; + } + try { + URI uri = new URI(address); + return host.equals(uri.getHost()); + } catch (Exception e) { + LOGGER.warn("Exception occurred while constructing URI using the address [{}]", address); + } + return false; + } + + private boolean regionAndAZMatch(String ownRegion, String ownAvailableZone, String engineRegion, + String engineAvailableZone) { + return ownRegion.equalsIgnoreCase(engineRegion) && ownAvailableZone.equals(engineAvailableZone); + } + + public void refreshAffinityAddress(Set sameZone, Set sameRegion) { + addressAutoRefreshed = true; + if (!sameZone.isEmpty()) { + availableZone.addAll(sameZone); + } + if (!sameRegion.isEmpty()) { + availableRegion.addAll(sameRegion); + } + } } diff --git a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/HttpTransportImpl.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/HttpTransportImpl.java index da87892929c..4f03f112f7c 100644 --- a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/HttpTransportImpl.java +++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/HttpTransportImpl.java @@ -18,6 +18,7 @@ package org.apache.servicecomb.http.client.common; import java.io.IOException; +import java.net.URI; import java.util.Map; import org.apache.http.client.HttpClient; @@ -87,7 +88,7 @@ public HttpResponse doRequest(HttpRequest httpRequest) throws IOException { globalHeaders.forEach(httpRequest::addHeader); } - httpRequest.getHeaders().putAll(requestAuthHeaderProvider.loadAuthHeader(createSignRequest())); + httpRequest.getHeaders().putAll(requestAuthHeaderProvider.loadAuthHeader(createSignRequest(httpRequest.getUrl()))); //get Http response org.apache.http.HttpResponse response = httpClient.execute(httpRequest.getRealRequest()); @@ -98,9 +99,15 @@ public HttpResponse doRequest(HttpRequest httpRequest) throws IOException { response.getAllHeaders()); } - private static SignRequest createSignRequest() { - // Now the implementations do not process SignRequest, so return null. Maybe future will use it. - return null; + private static SignRequest createSignRequest(String url) { + try { + URI uri = URI.create(url); + SignRequest signRequest = new SignRequest(); + signRequest.setEndpoint(uri); + return signRequest; + } catch (Exception e) { + return null; + } } @Override diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/OperationEvents.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/event/OperationEvents.java similarity index 79% rename from clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/OperationEvents.java rename to clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/event/OperationEvents.java index 62274519ea4..69896f750ec 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/OperationEvents.java +++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/event/OperationEvents.java @@ -15,10 +15,18 @@ * limitations under the License. */ -package org.apache.servicecomb.service.center.client; +package org.apache.servicecomb.http.client.event; public abstract class OperationEvents { public static class UnAuthorizedOperationEvent extends OperationEvents { + private final String address; + public UnAuthorizedOperationEvent(String address) { + this.address = address; + } + + public String getAddress() { + return address; + } } } diff --git a/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java b/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java index b831d032491..b14d6827cb8 100644 --- a/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java +++ b/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java @@ -46,9 +46,9 @@ public class AbstractAddressManagerTest { public void setUp() throws NoSuchFieldException, IllegalAccessException { addresses.add("http://127.0.0.1:30103"); addresses.add("https://127.0.0.2:30103"); - addressManager1 = new AbstractAddressManager(addresses); - addressManager2 = new AbstractAddressManager("project", addresses); - addressManager3 = new AbstractAddressManager(null, addresses); + addressManager1 = new AbstractAddressManager(addresses, "", ""); + addressManager2 = new AbstractAddressManager("project", addresses, "", ""); + addressManager3 = new AbstractAddressManager(null, addresses, "", ""); Field addressManagerField = addressManager1.getClass().getDeclaredField("index"); addressManagerField.setAccessible(true); addressManagerField.set(addressManager1, 0); @@ -88,7 +88,7 @@ public void recordStateTest() throws ExecutionException { zoneAndRegion.put("sameZone", addressAZ); zoneAndRegion.put("sameRegion", addressRG); RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "TEST"); - AbstractAddressManager addressManager = new AbstractAddressManager(addresses) {}; + AbstractAddressManager addressManager = new AbstractAddressManager(addresses, "", "") {}; addressManager.refreshEndpoint(event, "TEST"); @@ -124,7 +124,7 @@ public void recordStateTest() throws ExecutionException { @Test public void testMultipleThread() throws Exception { - AbstractAddressManager addressManager = new AbstractAddressManager(addresses); + AbstractAddressManager addressManager = new AbstractAddressManager(addresses, "", ""); String address = "http://127.0.0.3:30100"; CountDownLatch latch = new CountDownLatch(2); @@ -302,4 +302,27 @@ public void normalizeIPV6Test() { uri = addressManager1.normalizeUri("rest://[2008::7:957f:b2d6:1af4:a1f8]:30100"); Assertions.assertEquals("http://[2008::7:957f:b2d6:1af4:a1f8]:30100", uri); } + + @Test + public void compareAndGetAddressTest() { + List testAddr = new ArrayList<>(); + testAddr.add("https://192.168.20.160:30100"); + testAddr.add("https://127.0.0.1:30100"); + testAddr.add("https://127.0.0.3:30100"); + AbstractAddressManager manager = new AbstractAddressManager(testAddr, "", ""); + Assertions.assertTrue(manager.compareAndGetAddress("192.168.20.16").isEmpty()); + Assertions.assertEquals("https://192.168.20.160:30100", manager.compareAndGetAddress("192.168.20.160")); + } + + @Test + public void AddressAffinityTest() { + List testAddr = new ArrayList<>(); + testAddr.add("https://192.168.20.160:30100?region=region1&availableZone=zone1"); + testAddr.add("https://127.0.0.1:30100"); + AbstractAddressManager manager = new AbstractAddressManager(testAddr, "region1", "zone1"); + Assertions.assertEquals("https://192.168.20.160:30100", manager.address()); + + AbstractAddressManager manager2 = new AbstractAddressManager("default", testAddr, "region1", "zone1"); + Assertions.assertEquals("https://192.168.20.160:30100", manager2.address()); + } } diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterAddressManager.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterAddressManager.java index a5fd128ab1e..f955c148af3 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterAddressManager.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterAddressManager.java @@ -26,8 +26,9 @@ import com.google.common.eventbus.Subscribe; public class ServiceCenterAddressManager extends AbstractAddressManager { - public ServiceCenterAddressManager(String projectName, List addresses, EventBus eventBus) { - super(projectName, addresses); + public ServiceCenterAddressManager(String projectName, List addresses, String ownRegion, + String ownAvailableZone, EventBus eventBus) { + super(projectName, addresses, ownRegion, ownAvailableZone); eventBus.register(this); } diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java index 56e07bae055..9f6e2baf88e 100755 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java @@ -33,7 +33,6 @@ import org.apache.servicecomb.http.client.common.HttpTransport; import org.apache.servicecomb.http.client.common.HttpTransportFactory; import org.apache.servicecomb.http.client.common.HttpUtils; -import org.apache.servicecomb.service.center.client.OperationEvents.UnAuthorizedOperationEvent; import org.apache.servicecomb.service.center.client.exception.OperationException; import org.apache.servicecomb.service.center.client.model.CreateMicroserviceInstanceRequest; import org.apache.servicecomb.service.center.client.model.CreateMicroserviceRequest; @@ -76,8 +75,6 @@ public class ServiceCenterClient implements ServiceCenterOperation { private final ServiceCenterRawClient httpClient; - private EventBus eventBus; - private ServiceCenterAddressManager addressManager; public ServiceCenterClient(ServiceCenterRawClient httpClient) { @@ -85,8 +82,8 @@ public ServiceCenterClient(ServiceCenterRawClient httpClient) { } public ServiceCenterClient setEventBus(EventBus eventBus) { - this.eventBus = eventBus; addressManager.setEventBus(eventBus); + this.httpClient.setEventBus(eventBus); return this; } @@ -124,7 +121,6 @@ public MicroserviceInstancesResponse getServiceCenterInstances() { if (response.getStatusCode() == HttpStatus.SC_OK) { return HttpUtils.deserialize(response.getContent(), MicroserviceInstancesResponse.class); } - sendUnAuthorizedEvent(response); throw new OperationException( "get service-center instances fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -145,7 +141,6 @@ public RegisteredMicroserviceResponse registerMicroservice(Microservice microser if (response.getStatusCode() == HttpStatus.SC_OK) { return HttpUtils.deserialize(response.getContent(), RegisteredMicroserviceResponse.class); } - sendUnAuthorizedEvent(response); throw new OperationException( "register service fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -163,7 +158,6 @@ public MicroservicesResponse getMicroserviceList() { if (response.getStatusCode() == HttpStatus.SC_OK) { return HttpUtils.deserialize(response.getContent(), MicroservicesResponse.class); } - sendUnAuthorizedEvent(response); throw new OperationException( "get service List fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -188,7 +182,6 @@ public RegisteredMicroserviceResponse queryServiceId(Microservice microservice) if (response.getStatusCode() == HttpStatus.SC_OK) { return HttpUtils.deserialize(response.getContent(), RegisteredMicroserviceResponse.class); } - sendUnAuthorizedEvent(response); LOGGER.info("Query serviceId fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() + "; content = " + response.getContent()); @@ -211,7 +204,6 @@ public Microservice getMicroserviceByServiceId(String serviceId) { .deserialize(response.getContent(), MicroserviceResponse.class); return microserviceResponse.getService(); } - sendUnAuthorizedEvent(response); throw new OperationException( "get service message fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -233,7 +225,6 @@ public RegisteredMicroserviceInstanceResponse registerMicroserviceInstance(Micro if (response.getStatusCode() == HttpStatus.SC_OK) { return HttpUtils.deserialize(response.getContent(), RegisteredMicroserviceInstanceResponse.class); } - sendUnAuthorizedEvent(response); throw new OperationException( "register service instance fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -274,7 +265,6 @@ public FindMicroserviceInstancesResponse findMicroserviceInstance(String consume result.setModified(false); return result; } - sendUnAuthorizedEvent(response); throw new OperationException( "get service instances list fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -293,7 +283,6 @@ public MicroserviceInstancesResponse getMicroserviceInstanceList(String serviceI if (response.getStatusCode() == HttpStatus.SC_OK) { return HttpUtils.deserialize(response.getContent(), MicroserviceInstancesResponse.class); } - sendUnAuthorizedEvent(response); throw new OperationException( "get service instances list fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -314,7 +303,6 @@ public MicroserviceInstance getMicroserviceInstance(String serviceId, String ins .deserialize(response.getContent(), MicroserviceInstanceResponse.class); return instanceResponse.getInstance(); } - sendUnAuthorizedEvent(response); throw new OperationException( "get service instance message fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -334,7 +322,6 @@ public void deleteMicroserviceInstance(String serviceId, String instanceId) { LOGGER.info("Delete service instance successfully."); return; } - sendUnAuthorizedEvent(response); throw new OperationException( "delete service instance fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -354,7 +341,6 @@ public boolean updateMicroserviceInstanceStatus(String serviceId, String instanc if (response.getStatusCode() == HttpStatus.SC_OK) { return true; } - sendUnAuthorizedEvent(response); throw new OperationException( "update service instance status fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -373,7 +359,6 @@ public void sendHeartBeats(HeartbeatsRequest heartbeatsRequest) { if (response.getStatusCode() == HttpStatus.SC_OK) { return; } - sendUnAuthorizedEvent(response); throw new OperationException( "heartbeats fails, statusCode = " + response.getStatusCode() + "; message = " + response.getMessage() + "; content = " + response.getContent()); @@ -393,7 +378,6 @@ public boolean sendHeartBeat(String serviceId, String instanceId) { if (response.getStatusCode() == HttpStatus.SC_OK) { return true; } - sendUnAuthorizedEvent(response); throw new OperationException( "heartbeats fails, statusCode = " + response.getStatusCode() + "; message = " + response.getMessage() + "; content = " + response.getContent()); @@ -418,7 +402,6 @@ public List getServiceSchemasList(String serviceId, boolean withCont .deserialize(response.getContent(), GetSchemaListResponse.class); return getSchemaResponse.getSchemas(); } - sendUnAuthorizedEvent(response); throw new OperationException( "get service schemas list fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -445,7 +428,6 @@ public String getServiceSchemaContext(String serviceId, String schemaId) { GetSchemaResponse getSchemaResponse = HttpUtils.deserialize(response.getContent(), GetSchemaResponse.class); return getSchemaResponse.getSchema(); } - sendUnAuthorizedEvent(response); throw new OperationException( "get service schema context fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -465,7 +447,6 @@ public boolean registerSchema(String serviceId, String schemaId, CreateSchemaReq if (response.getStatusCode() == HttpStatus.SC_OK) { return true; } - sendUnAuthorizedEvent(response); throw new OperationException( "update service schema fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -488,7 +469,6 @@ public boolean updateServiceSchemaContext(String serviceId, SchemaInfo schemaInf if (response.getStatusCode() == HttpStatus.SC_OK) { return true; } - sendUnAuthorizedEvent(response); throw new OperationException( "update service schema fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -508,7 +488,6 @@ public boolean batchUpdateServiceSchemaContext(String serviceId, ModifySchemasRe if (response.getStatusCode() == HttpStatus.SC_OK) { return true; } - sendUnAuthorizedEvent(response); throw new OperationException( "update service schema fails, statusCode = " + response.getStatusCode() + "; message = " + response .getMessage() @@ -519,18 +498,13 @@ public boolean batchUpdateServiceSchemaContext(String serviceId, ModifySchemasRe } } - private void sendUnAuthorizedEvent(HttpResponse response) { - if (this.eventBus != null && response.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) { - this.eventBus.post(new UnAuthorizedOperationEvent()); - } - } - @Override - public RbacTokenResponse queryToken(RbacTokenRequest request) { + public RbacTokenResponse queryToken(RbacTokenRequest request, String host) { try { + String queryAddress = addressManager.compareAndGetAddress(host); HttpResponse response = httpClient .postHttpRequestAbsoluteUrl("/v4/token", null, - HttpUtils.serialize(request)); + HttpUtils.serialize(request), queryAddress); if (response.getStatusCode() == HttpStatus.SC_OK) { RbacTokenResponse result = HttpUtils.deserialize(response.getContent(), RbacTokenResponse.class); result.setStatusCode(HttpStatus.SC_OK); @@ -570,7 +544,6 @@ public boolean updateMicroserviceProperties(String serviceId, Map headers, String content) + public HttpResponse postHttpRequestAbsoluteUrl(String url, Map headers, String content, String address) throws IOException { - return doHttpRequest(url, true, headers, content, HttpRequest.POST); + return doHttpRequest(url, true, headers, content, HttpRequest.POST, address); } public HttpResponse postHttpRequest(String url, Map headers, String content) throws IOException { - return doHttpRequest(url, false, headers, content, HttpRequest.POST); + return doHttpRequest(url, false, headers, content, HttpRequest.POST, ""); } public HttpResponse putHttpRequest(String url, Map headers, String content) throws IOException { - return doHttpRequest(url, false, headers, content, HttpRequest.PUT); + return doHttpRequest(url, false, headers, content, HttpRequest.PUT, ""); } public HttpResponse deleteHttpRequest(String url, Map headers, String content) throws IOException { - return doHttpRequest(url, false, headers, content, HttpRequest.DELETE); + return doHttpRequest(url, false, headers, content, HttpRequest.DELETE, ""); } private HttpResponse doHttpRequest(String url, boolean absoluteUrl, Map headers, String content, - String method) - throws IOException { - String address = addressManager.address(); + String method, String queryAddress) throws IOException { + String address = StringUtils.isEmpty(queryAddress) ? addressManager.address() : queryAddress; String formatUrl = addressManager.formatUrl(url, absoluteUrl, address); HttpRequest httpRequest = buildHttpRequest(formatUrl, headers, content, method); - + HttpResponse httpResponse; try { - HttpResponse httpResponse = httpTransport.doRequest(httpRequest); - addressManager.recordSuccessState(address); + httpResponse = httpTransport.doRequest(httpRequest); + recordAndSendUnAuthorizedEvent(httpResponse, address); return httpResponse; } catch (IOException e) { addressManager.recordFailState(address); @@ -87,7 +93,9 @@ private HttpResponse doHttpRequest(String url, boolean absoluteUrl, Map headers, St return new HttpRequest(url, headers, content, method); } + public void setEventBus(EventBus eventBus) { + this.eventBus = eventBus; + } + public static class Builder { private String tenantName; diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java index a8be8b98b78..46ba02b8b9a 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java @@ -17,12 +17,14 @@ package org.apache.servicecomb.service.center.client; +import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.servicecomb.foundation.auth.SignRequest; import org.apache.servicecomb.http.client.auth.RequestAuthHeaderProvider; import org.apache.servicecomb.http.client.common.HttpConfiguration.SSLProperties; import org.apache.servicecomb.http.client.common.WebSocketListener; @@ -107,7 +109,7 @@ private void startWatch() { Map headers = new HashMap<>(); headers.put("x-domain-name", this.tenantName); headers.putAll(this.extraGlobalHeaders); - headers.putAll(this.requestAuthHeaderProvider.loadAuthHeader(null)); + headers.putAll(this.requestAuthHeaderProvider.loadAuthHeader(createSignRequest(address))); currentServerUri = convertAddress(address); LOGGER.info("start watch to address {}", currentServerUri); webSocketTransport = new WebSocketTransport(currentServerUri, sslProperties, @@ -121,6 +123,17 @@ private void startWatch() { }); } + private SignRequest createSignRequest(String url) { + try { + URI uri = URI.create(url); + SignRequest signRequest = new SignRequest(); + signRequest.setEndpoint(uri); + return signRequest; + } catch (Exception e) { + return null; + } + } + private String convertAddress(String address) { String url = String.format(WATCH, project, serviceId); if (address.startsWith(HTTP)) { diff --git a/clients/service-center-client/src/test/java/org/apache/servicecomb/service/center/client/ServiceCenterAddressManagerTest.java b/clients/service-center-client/src/test/java/org/apache/servicecomb/service/center/client/ServiceCenterAddressManagerTest.java index f5c316cd848..d425cc5d449 100644 --- a/clients/service-center-client/src/test/java/org/apache/servicecomb/service/center/client/ServiceCenterAddressManagerTest.java +++ b/clients/service-center-client/src/test/java/org/apache/servicecomb/service/center/client/ServiceCenterAddressManagerTest.java @@ -41,7 +41,7 @@ class ServiceCenterAddressManagerTest { @Test public void getUrlPrefix() { addresses.add("http://127.0.0.1:30103"); - addressManager1 = new ServiceCenterAddressManager("project", addresses, new EventBus()); + addressManager1 = new ServiceCenterAddressManager("project", addresses, "", "", new EventBus()); Assertions.assertNotNull(addressManager1); @@ -55,7 +55,7 @@ public void getUrlPrefix() { @Test public void formatUrlTest() { addresses.add("http://127.0.0.1:30103"); - addressManager1 = new ServiceCenterAddressManager("project", addresses, new EventBus()); + addressManager1 = new ServiceCenterAddressManager("project", addresses, "", "", new EventBus()); Assertions.assertNotNull(addressManager1); String address = addressManager1.address(); @@ -76,7 +76,7 @@ public void onRefreshEndpointEvent() { Map> zoneAndRegion = new HashMap<>(); zoneAndRegion.put("sameZone", addressAZ); zoneAndRegion.put("sameRegion", addressRG); - addressManager1 = new ServiceCenterAddressManager("project", addresses, new EventBus()); + addressManager1 = new ServiceCenterAddressManager("project", addresses, "", "", new EventBus()); RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "SERVICECENTER"); addressManager1.refreshEndpoint(event, "SERVICECENTER"); diff --git a/clients/service-center-client/src/test/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClientTest.java b/clients/service-center-client/src/test/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClientTest.java index 29337e31a44..0cda29fc573 100755 --- a/clients/service-center-client/src/test/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClientTest.java +++ b/clients/service-center-client/src/test/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClientTest.java @@ -41,7 +41,8 @@ public class ServiceCenterRawClientTest { public void TestDefaultParameter() throws IOException { HttpTransport httpTransport = Mockito.mock(HttpTransport.class); - ServiceCenterAddressManager addressManager = new ServiceCenterAddressManager(PROJECT_NAME, Arrays.asList("http://127.0.0.1:30100"), new EventBus()); + ServiceCenterAddressManager addressManager = new ServiceCenterAddressManager(PROJECT_NAME, + Arrays.asList("http://127.0.0.1:30100"), "", "", new EventBus()); ServiceCenterRawClient client = new ServiceCenterRawClient.Builder() .setHttpTransport(httpTransport) .setAddressManager(addressManager) diff --git a/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java b/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java index 50ae96db158..bfcfc4bbf9a 100644 --- a/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java +++ b/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java @@ -69,8 +69,8 @@ public RegistryClientTest(Environment environment) { @Override public void testRestTransport() throws Exception { - ServiceCenterAddressManager addressManager = new ServiceCenterAddressManager("default", Arrays.asList("http://127.0.0.1:30100"), - new EventBus()); + ServiceCenterAddressManager addressManager = new ServiceCenterAddressManager("default", + Arrays.asList("http://127.0.0.1:30100"), "", "", new EventBus()); SSLProperties sslProperties = new SSLProperties(); sslProperties.setEnabled(false); ServiceCenterClient serviceCenterClient = new ServiceCenterClient(addressManager, sslProperties, diff --git a/demo/docker-build-config/pom.xml b/demo/docker-build-config/pom.xml index 4faa3efdd33..625e01894ab 100644 --- a/demo/docker-build-config/pom.xml +++ b/demo/docker-build-config/pom.xml @@ -42,7 +42,7 @@ ${project.artifactId}:${project.version} ${project.artifactId} - openjdk:8-jre-alpine + openjdk:8u342-jre 7070 8080 diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/ConfigCenterConfig.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/ConfigCenterConfig.java index 430ab158925..daf3935aee4 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/ConfigCenterConfig.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/ConfigCenterConfig.java @@ -58,6 +58,10 @@ public final class ConfigCenterConfig { private static final String CLIENT_SOCKET_TIMEOUT = "servicecomb.config.client.timeout.socket"; + private static final String REGION = "servicecomb.datacenter.region"; + + private static final String AVAILABLE_ZONE = "servicecomb.datacenter.availableZone"; + private ConfigCenterConfig() { } @@ -154,4 +158,12 @@ public int getConnectionRequestTimeout(Configuration configuration) { public int getSocketTimeout(Configuration configuration) { return configuration.getInt(CLIENT_SOCKET_TIMEOUT, 5000); } + + public String getRegion() { + return finalConfig.getString(REGION, ""); + } + + public String getAvailableZone() { + return finalConfig.getString(AVAILABLE_ZONE, ""); + } } diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/ConfigCenterConfigurationSourceImpl.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/ConfigCenterConfigurationSourceImpl.java index 492c5658f2f..37e315212d2 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/ConfigCenterConfigurationSourceImpl.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/ConfigCenterConfigurationSourceImpl.java @@ -188,8 +188,9 @@ private HttpTransport createHttpTransport(ConfigCenterAddressManager configCente private static RequestAuthHeaderProvider getRequestAuthHeaderProvider(List authHeaderProviders) { return signRequest -> { + String host = signRequest != null && signRequest.getEndpoint() != null ? signRequest.getEndpoint().getHost() : ""; Map headers = new HashMap<>(); - authHeaderProviders.forEach(provider -> headers.putAll(provider.authHeaders())); + authHeaderProviders.forEach(provider -> headers.putAll(provider.authHeaders(host))); return headers; }; } @@ -198,7 +199,15 @@ private ConfigCenterAddressManager configCenterAddressManager() { return new ConfigCenterAddressManager(ConfigCenterConfig.INSTANCE.getDomainName(), Deployment .getSystemBootStrapInfo(ConfigCenterDefaultDeploymentProvider.SYSTEM_KEY_CONFIG_CENTER).getAccessURL(), - EventManager.getEventBus()); + getRegion(), getAvailableZone(), EventManager.getEventBus()); + } + + private String getRegion() { + return ConfigCenterConfig.INSTANCE.getRegion(); + } + + private String getAvailableZone() { + return ConfigCenterConfig.INSTANCE.getAvailableZone(); } private void updateConfiguration(WatchedUpdateResult result) { diff --git a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/ConfigCenterConfigurationSourceImplTest.java b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/ConfigCenterConfigurationSourceImplTest.java index 4a9b64a886e..f5fa3d40488 100644 --- a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/ConfigCenterConfigurationSourceImplTest.java +++ b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/ConfigCenterConfigurationSourceImplTest.java @@ -36,7 +36,8 @@ void configAddressManagerTest() throws IllegalAccessException, NoSuchFieldExcept List addresses = new ArrayList<>(); addresses.add("http://127.0.0.1:30103"); addresses.add("http://127.0.0.2:30103"); - ConfigCenterAddressManager addressManager = new ConfigCenterAddressManager("test", addresses, EventManager.getEventBus()); + ConfigCenterAddressManager addressManager = new ConfigCenterAddressManager("test", addresses, + "", "", EventManager.getEventBus()); Field addressManagerField = addressManager.getClass().getSuperclass().getDeclaredField("index"); addressManagerField.setAccessible(true); addressManagerField.set(addressManager, 0); @@ -47,7 +48,7 @@ void configAddressManagerTest() throws IllegalAccessException, NoSuchFieldExcept address = addressManager.address(); Assertions.assertEquals("http://127.0.0.1:30103/v3/test", address); - addressManager = new ConfigCenterAddressManager(null, addresses, EventManager.getEventBus()); + addressManager = new ConfigCenterAddressManager(null, addresses, "", "", EventManager.getEventBus()); addressManagerField = addressManager.getClass().getSuperclass().getDeclaredField("index"); addressManagerField.setAccessible(true); addressManagerField.set(addressManager, 0); @@ -65,7 +66,8 @@ void onRefreshEndpointEventTest() { zoneAndRegion.put("sameZone", addressAZ); zoneAndRegion.put("sameRegion", new ArrayList<>()); RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "CseConfigCenter"); - ConfigCenterAddressManager addressManager = new ConfigCenterAddressManager("test", addresses, EventManager.getEventBus()); + ConfigCenterAddressManager addressManager = new ConfigCenterAddressManager("test", addresses, + "", "", EventManager.getEventBus()); addressManager.onRefreshEndpointEvent(event); List availableAZ = addressManager.getAvailableZone(); diff --git a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/center/client/ConfigCenterAddressManagerTest.java b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/center/client/ConfigCenterAddressManagerTest.java index f47cc35beea..4592b867f78 100644 --- a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/center/client/ConfigCenterAddressManagerTest.java +++ b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/center/client/ConfigCenterAddressManagerTest.java @@ -40,8 +40,8 @@ class ConfigCenterAddressManagerTest { public void addressManagerTest() throws NoSuchFieldException, IllegalAccessException { addresses.add("http://127.0.0.1:30103"); addresses.add("https://127.0.0.2:30103"); - addressManager1 = new ConfigCenterAddressManager("project", addresses, new EventBus()); - addressManager2 = new ConfigCenterAddressManager(null, addresses, new EventBus()); + addressManager1 = new ConfigCenterAddressManager("project", addresses, "", "", new EventBus()); + addressManager2 = new ConfigCenterAddressManager(null, addresses, "", "", new EventBus()); Field addressManagerField = addressManager1.getClass().getSuperclass().getDeclaredField("index"); addressManagerField.setAccessible(true); addressManagerField.set(addressManager1, 0); @@ -70,7 +70,7 @@ public void onRefreshEndpointEvent() { Map> zoneAndRegion = new HashMap<>(); zoneAndRegion.put("sameZone", addressAZ); zoneAndRegion.put("sameRegion", addressRG); - addressManager1 = new ConfigCenterAddressManager("project", addresses, new EventBus()); + addressManager1 = new ConfigCenterAddressManager("project", addresses, "", "", new EventBus()); RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "CseConfigCenter"); addressManager1.refreshEndpoint(event, "CseConfigCenter"); diff --git a/dynamic-config/config-kie/src/main/java/org/apache/servicecomb/config/kie/KieConfig.java b/dynamic-config/config-kie/src/main/java/org/apache/servicecomb/config/kie/KieConfig.java index 872165f51eb..e556c778e8e 100644 --- a/dynamic-config/config-kie/src/main/java/org/apache/servicecomb/config/kie/KieConfig.java +++ b/dynamic-config/config-kie/src/main/java/org/apache/servicecomb/config/kie/KieConfig.java @@ -75,6 +75,10 @@ public class KieConfig { private static final String CUSTOM_LABEL_VALUE_DEFAULT = ""; + private static final String REGION = "servicecomb.datacenter.region"; + + private static final String AVAILABLE_ZONE = "servicecomb.datacenter.availableZone"; + private KieConfig() { } @@ -185,4 +189,12 @@ public String getProxyUsername() { public String getProxyPasswd() { return finalConfig.getString(VertxConst.PROXY_PASSWD, null); } + + public String getRegion() { + return finalConfig.getString(REGION, ""); + } + + public String getAvailableZone() { + return finalConfig.getString(AVAILABLE_ZONE, ""); + } } diff --git a/dynamic-config/config-kie/src/main/java/org/apache/servicecomb/config/kie/KieConfigurationSourceImpl.java b/dynamic-config/config-kie/src/main/java/org/apache/servicecomb/config/kie/KieConfigurationSourceImpl.java index 51446608909..683a81b8394 100644 --- a/dynamic-config/config-kie/src/main/java/org/apache/servicecomb/config/kie/KieConfigurationSourceImpl.java +++ b/dynamic-config/config-kie/src/main/java/org/apache/servicecomb/config/kie/KieConfigurationSourceImpl.java @@ -169,15 +169,19 @@ private HttpTransport createHttpTransport(KieAddressManager kieAddressManager, R private static RequestAuthHeaderProvider getRequestAuthHeaderProvider(List authHeaderProviders) { return signRequest -> { + String host = signRequest != null && signRequest.getEndpoint() != null ? signRequest.getEndpoint().getHost() : ""; Map headers = new HashMap<>(); - authHeaderProviders.forEach(provider -> headers.putAll(provider.authHeaders())); + authHeaderProviders.forEach(provider -> headers.putAll(provider.authHeaders(host))); return headers; }; } private KieAddressManager configKieAddressManager() { + String region = KieConfig.INSTANCE.getRegion(); + String availableZone = KieConfig.INSTANCE.getAvailableZone(); return new KieAddressManager( - Arrays.asList(KieConfig.INSTANCE.getServerUri().split(",")), EventManager.getEventBus()); + Arrays.asList(KieConfig.INSTANCE.getServerUri().split(",")), region, availableZone, + EventManager.getEventBus()); } private void updateConfiguration(WatchedUpdateResult result) { diff --git a/foundations/foundation-spi/src/main/java/org/apache/servicecomb/foundation/auth/AuthHeaderProvider.java b/foundations/foundation-spi/src/main/java/org/apache/servicecomb/foundation/auth/AuthHeaderProvider.java index 313c19ee5ba..0e036e21a58 100644 --- a/foundations/foundation-spi/src/main/java/org/apache/servicecomb/foundation/auth/AuthHeaderProvider.java +++ b/foundations/foundation-spi/src/main/java/org/apache/servicecomb/foundation/auth/AuthHeaderProvider.java @@ -21,11 +21,21 @@ import java.util.Map; public interface AuthHeaderProvider { - default Map authHeaders() { + /** + * Obtain RBAC authentication request header, host is the key of cache + * + * @param host engine address ip + * @return auth headers + */ + default Map authHeaders(String host) { return new HashMap<>(0); } default Map getSignAuthHeaders(SignRequest request) { - return authHeaders(); + String host = ""; + if (request != null && request.getEndpoint() != null) { + host = request.getEndpoint().getHost(); + } + return authHeaders(host); } } diff --git a/huawei-cloud/dashboard/src/main/java/org/apache/servicecomb/huaweicloud/dashboard/monitor/DefaultMonitorDataPublisher.java b/huawei-cloud/dashboard/src/main/java/org/apache/servicecomb/huaweicloud/dashboard/monitor/DefaultMonitorDataPublisher.java index de350f78779..920f0b10396 100644 --- a/huawei-cloud/dashboard/src/main/java/org/apache/servicecomb/huaweicloud/dashboard/monitor/DefaultMonitorDataPublisher.java +++ b/huawei-cloud/dashboard/src/main/java/org/apache/servicecomb/huaweicloud/dashboard/monitor/DefaultMonitorDataPublisher.java @@ -45,6 +45,8 @@ import org.apache.servicecomb.huaweicloud.dashboard.monitor.model.MonitorDataProvider; import org.apache.servicecomb.huaweicloud.dashboard.monitor.model.MonitorDataPublisher; +import com.netflix.config.DynamicPropertyFactory; + public class DefaultMonitorDataPublisher implements MonitorDataPublisher { private static final String SSL_KEY = "mc.consumer"; @@ -76,7 +78,11 @@ private DashboardAddressManager createDashboardAddressManager() { throw new IllegalStateException("dashboard address is not configured."); } - return new DashboardAddressManager(addresses, EventManager.getEventBus()); + String region = DynamicPropertyFactory.getInstance(). + getStringProperty("servicecomb.datacenter.region", "").get(); + String availableZone = DynamicPropertyFactory.getInstance(). + getStringProperty("servicecomb.datacenter.availableZone", "").get(); + return new DashboardAddressManager(addresses, region, availableZone, EventManager.getEventBus()); } private HttpTransport createHttpTransport(DashboardAddressManager addressManager, RequestConfig requestConfig, @@ -111,8 +117,9 @@ private HttpTransport createHttpTransport(DashboardAddressManager addressManager private static RequestAuthHeaderProvider getRequestAuthHeaderProvider(List authHeaderProviders) { return signRequest -> { + String host = signRequest != null && signRequest.getEndpoint() != null ? signRequest.getEndpoint().getHost() : ""; Map headers = new HashMap<>(); - authHeaderProviders.forEach(provider -> headers.putAll(provider.authHeaders())); + authHeaderProviders.forEach(provider -> headers.putAll(provider.authHeaders(host))); return headers; }; } diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/RBACBootStrapService.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/RBACBootStrapService.java index 1c599c4cca7..a3671a3eb4d 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/RBACBootStrapService.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/RBACBootStrapService.java @@ -93,8 +93,10 @@ Cipher getCipher(String cipherName) { } private ServiceCenterAddressManager createAddressManager(Environment environment) { + String region = environment.getProperty("servicecomb.datacenter.region", ""); + String availableZone = environment.getProperty("servicecomb.datacenter.availableZone", ""); return new ServiceCenterAddressManager(getTenantName(environment), - getRBACAddressList(environment), EventManager.getEventBus()); + getRBACAddressList(environment), region, availableZone, EventManager.getEventBus()); } private SSLProperties createSSLProperties(Environment environment, String tag) { diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenAuthHeaderProvider.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenAuthHeaderProvider.java index 0f075568313..b040f516f65 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenAuthHeaderProvider.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenAuthHeaderProvider.java @@ -26,8 +26,8 @@ public class TokenAuthHeaderProvider implements AuthHeaderProvider { @Override - public Map authHeaders() { - String token = TokenCacheManager.getInstance().getToken(RBACBootStrapService.DEFAULT_REGISTRY_NAME); + public Map authHeaders(String host) { + String token = TokenCacheManager.getInstance().getToken(host); if (StringUtils.isEmpty(token)) { return new HashMap<>(); } diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java index e43f9127b7f..31d6a0ee911 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java @@ -17,9 +17,8 @@ package org.apache.servicecomb.serviceregistry.auth; +import java.net.URI; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -28,13 +27,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.foundation.auth.Cipher; -import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; -import org.apache.servicecomb.http.client.event.EngineConnectChangedEvent; +import org.apache.servicecomb.http.client.event.OperationEvents.UnAuthorizedOperationEvent; import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus; import org.apache.servicecomb.service.center.client.ServiceCenterClient; import org.apache.servicecomb.service.center.client.model.RbacTokenRequest; import org.apache.servicecomb.service.center.client.model.RbacTokenResponse; -import org.apache.servicecomb.serviceregistry.event.NotPermittedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,17 +52,15 @@ public final class TokenCacheManager { private static final TokenCacheManager INSTANCE = new TokenCacheManager(); - - private final Map tokenCacheMap; - private Map serviceCenterClients; + private TokenCache tokenCache; + public static TokenCacheManager getInstance() { return INSTANCE; } private TokenCacheManager() { - tokenCacheMap = new ConcurrentHashMapEx<>(); } public void setServiceCenterClients(Map serviceCenterClients) { @@ -73,24 +68,17 @@ public void setServiceCenterClients(Map serviceCent } public void addTokenCache(String registryName, String accountName, String password, Cipher cipher) { - Objects.requireNonNull(registryName, "registryName should not be null!"); - if (tokenCacheMap.containsKey(registryName)) { - LOGGER.warn("duplicate token cache registration for serviceRegistry[{}]", registryName); - return; - } - - tokenCacheMap.put(registryName, new TokenCache(registryName, accountName, password, cipher)); + tokenCache = new TokenCache(registryName, accountName, password, cipher); } - public String getToken(String registryName) { - return Optional.ofNullable(tokenCacheMap.get(registryName)) - .map(TokenCache::getToken) - .orElse(null); + public String getToken(String host) { + if (tokenCache == null) { + return null; + } + return tokenCache.getToken(host); } public class TokenCache { - private static final String UN_AUTHORIZED_CODE_HALF_OPEN = "401302"; - private static final long TOKEN_REFRESH_TIME_IN_SECONDS = 20 * 60 * 1000; private final String registryName; @@ -105,10 +93,6 @@ public class TokenCache { private final Cipher cipher; - private int lastStatusCode; - - private String lastErrorCode; - public TokenCache(String registryName, String accountName, String password, Cipher cipher) { this.registryName = registryName; @@ -133,12 +117,12 @@ public void run() { .build(new CacheLoader() { @Override public String load(String key) throws Exception { - return createHeaders(); + return createHeaders(key); } @Override public ListenableFuture reload(String key, String oldValue) throws Exception { - return Futures.submit(() -> createHeaders(), executorService); + return Futures.submit(() -> createHeaders(key), executorService); } }); ServiceCenterEventBus.getEventBus().register(this); @@ -146,34 +130,28 @@ public ListenableFuture reload(String key, String oldValue) throws Excep } @Subscribe - public void onNotPermittedEvent(NotPermittedEvent event) { - this.executorService.submit(() -> { - if (lastStatusCode == Status.UNAUTHORIZED.getStatusCode() && UN_AUTHORIZED_CODE_HALF_OPEN - .equals(lastErrorCode)) { - cache.refresh(registryName); - } - }); + public void onUnAuthorizedOperationEvent(UnAuthorizedOperationEvent event) { + LOGGER.warn("address {} unAuthorized, refresh cache token!", event.getAddress()); + cache.refresh(getHostByAddress(event.getAddress())); } - @Subscribe - public void onEngineConnectChangedEvent(EngineConnectChangedEvent event) { - cache.refresh(registryName); + private String getHostByAddress(String address) { + try { + URI uri = URI.create(address); + return uri.getHost(); + } catch (Exception e) { + LOGGER.error("get host by address [{}] error!", address, e); + return registryName; + } } - private String createHeaders() { - LOGGER.info("start to create RBAC headers"); - + private String createHeaders(String host) { + LOGGER.info("start to create RBAC headers for host: {}", host); ServiceCenterClient serviceCenterClient = serviceCenterClients.get(this.registryName); - RbacTokenRequest request = new RbacTokenRequest(); request.setName(accountName); request.setPassword(new String(cipher.decrypt(password.toCharArray()))); - - RbacTokenResponse rbacTokenResponse = serviceCenterClient.queryToken(request); - - this.lastStatusCode = rbacTokenResponse.getStatusCode(); - this.lastErrorCode = rbacTokenResponse.getErrorCode(); - + RbacTokenResponse rbacTokenResponse = serviceCenterClient.queryToken(request, ""); if (Status.UNAUTHORIZED.getStatusCode() == rbacTokenResponse.getStatusCode() || Status.FORBIDDEN.getStatusCode() == rbacTokenResponse.getStatusCode()) { // password wrong, do not try anymore @@ -185,8 +163,14 @@ private String createHeaders() { LOGGER.warn("service center do not support RBAC token, you should not config account info"); return INVALID_TOKEN; } + if (Status.INTERNAL_SERVER_ERROR.getStatusCode() == rbacTokenResponse.getStatusCode()) { + // return null for server_error, so the token information can be re-fetched on the next call. + // It will prompt 'CacheLoader returned null for key xxx' + LOGGER.warn("service center query RBAC token error!"); + return null; + } - LOGGER.info("refresh token successfully {}", rbacTokenResponse.getStatusCode()); + LOGGER.info("refresh host [{}] token successfully {}", host, rbacTokenResponse.getStatusCode()); return rbacTokenResponse.getToken(); } @@ -194,13 +178,16 @@ protected long refreshTime() { return TOKEN_REFRESH_TIME_IN_SECONDS; } - public String getToken() { + public String getToken(String host) { if (!enabled()) { return null; } - + String address = host; + if (StringUtils.isEmpty(address)) { + address = registryName; + } try { - return cache.get(registryName); + return cache.get(address); } catch (Exception e) { LOGGER.error("failed to create token", e); return null; diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/IpPortManager.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/IpPortManager.java index 1dcd10a0e4c..f2808189f18 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/IpPortManager.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/IpPortManager.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.netflix.config.DynamicPropertyFactory; + public class IpPortManager { private static final Logger LOGGER = LoggerFactory.getLogger(IpPortManager.class); @@ -66,11 +68,22 @@ public IpPortManager(ServiceRegistryConfig serviceRegistryConfig) { throw new IllegalArgumentException("Service center address is required to start the application."); } List addresses = defaultIpPort.stream().map(IpPort::toString).collect(Collectors.toList()); - addressManger = new ServiceRegistryAddressManager(addresses, EventManager.getEventBus()); + addressManger = new ServiceRegistryAddressManager(addresses, "", "", EventManager.getEventBus()); + addressManger.constructAffinityAddress(serviceRegistryConfig.getOriginAddress(), getRegion(), getAvailableZone()); classificationAddress = new ClassificationAddress(serviceRegistryConfig, instanceCacheManager); LOGGER.info("Initial service center address is {}", getAvailableAddress()); } + private String getRegion() { + return DynamicPropertyFactory.getInstance(). + getStringProperty("servicecomb.datacenter.region", "").get(); + } + + private String getAvailableZone() { + return DynamicPropertyFactory.getInstance(). + getStringProperty("servicecomb.datacenter.availableZone", "").get(); + } + // we have to do this operation after the first time setup has already done public void initAutoDiscovery() { if (!autoDiscoveryInited && this.serviceRegistryConfig.isRegistryAutoDiscovery()) { diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/EmptyAuthHeaderProvider.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/EmptyAuthHeaderProvider.java index 37828b4ad15..79dc86cfdbc 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/EmptyAuthHeaderProvider.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/EmptyAuthHeaderProvider.java @@ -24,7 +24,7 @@ public class EmptyAuthHeaderProvider implements AuthHeaderProvider { @Override - public Map authHeaders() { + public Map authHeaders(String host) { return new HashMap<>(0); } } diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java index 28311a0ebeb..ff54002e184 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java @@ -35,6 +35,7 @@ import org.apache.servicecomb.foundation.common.net.IpPort; import org.apache.servicecomb.foundation.common.utils.JsonUtils; import org.apache.servicecomb.foundation.vertx.AsyncResultCallback; +import org.apache.servicecomb.http.client.event.OperationEvents.UnAuthorizedOperationEvent; import org.apache.servicecomb.http.client.utils.ServiceCombServiceAvailableUtils; import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent; import org.apache.servicecomb.registry.api.registry.FindInstancesResponse; @@ -64,7 +65,6 @@ import org.apache.servicecomb.serviceregistry.client.IpPortManager; import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig; -import org.apache.servicecomb.serviceregistry.event.NotPermittedEvent; import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus; import org.apache.servicecomb.serviceregistry.task.HeartbeatResult; import org.apache.servicecomb.serviceregistry.task.MicroserviceInstanceHeartbeatTask; @@ -152,7 +152,7 @@ public Handler syncHandler(CountDownLatch countDownLatch, Clas return; } holder.setStatusCode(response.statusCode()); - sendUnAuthorizedEvent(response); + sendUnAuthorizedEvent(response, requestContext); response.exceptionHandler(e -> { LOGGER.error("error in processing response.", e); countDownLatch.countDown(); @@ -232,7 +232,7 @@ private Handler syncHandlerEx(CountDownLatch countDownLatch, Holde return; } - sendUnAuthorizedEvent(response); + sendUnAuthorizedEvent(response, requestContext); response.exceptionHandler(e -> { LOGGER.error("error in processing response.", e); countDownLatch.countDown(); @@ -247,12 +247,20 @@ private Handler syncHandlerEx(CountDownLatch countDownLatch, Holde }; } - private void sendUnAuthorizedEvent(HttpClientResponse response) { + private void sendUnAuthorizedEvent(HttpClientResponse response, RequestContext requestContext) { if (response.statusCode() == Status.UNAUTHORIZED.getStatusCode()) { - ServiceCenterEventBus.getEventBus().post(new NotPermittedEvent()); + ServiceCenterEventBus.getEventBus().post(new UnAuthorizedOperationEvent(getAddressWithProtocol(requestContext))); } } + private String getAddressWithProtocol(RequestContext requestContext) { + String ipAndPort = requestContext.getIpPort().toString(); + if (ipAndPort.startsWith("http")) { + return ipAndPort; + } + return "https://" + ipAndPort; + } + private Handler syncHandlerForInstances(CountDownLatch countDownLatch, MicroserviceInstances mInstances) { return restResponse -> { @@ -1007,7 +1015,7 @@ public Handler addressSyncHandler(CountDownLatch countDownLatc return; } holder.setStatusCode(response.statusCode()); - sendUnAuthorizedEvent(response); + sendUnAuthorizedEvent(response, restResponse.getRequestContext()); countDownLatch.countDown(); }; } diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/config/ServiceRegistryConfig.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/config/ServiceRegistryConfig.java index 4e5b3a825dd..7cf3dafe2fb 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/config/ServiceRegistryConfig.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/config/ServiceRegistryConfig.java @@ -132,6 +132,8 @@ public class ServiceRegistryConfig { private Function serviceRegistryClientConstructor = serviceRegistry -> new ServiceRegistryClientImpl(this); + private List originAddress; + public ServiceRegistryConfig() { } @@ -446,4 +448,13 @@ public ServiceRegistryConfig setServiceRegistryClientConstructor( public ServiceRegistryClient createServiceRegistryClient(ServiceRegistry serviceRegistry) { return this.serviceRegistryClientConstructor.apply(serviceRegistry); } + + public List getOriginAddress() { + return originAddress; + } + + public ServiceRegistryConfig setOriginAddress(List originAddress) { + this.originAddress = originAddress; + return this; + } } diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/config/ServiceRegistryConfigBuilder.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/config/ServiceRegistryConfigBuilder.java index ae6b7fd080f..647337db378 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/config/ServiceRegistryConfigBuilder.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/config/ServiceRegistryConfigBuilder.java @@ -46,12 +46,15 @@ class ServiceRegistryConfigBuilder { private boolean ssl; + private List originAddress; + public ServiceRegistryConfig build() { return new ServiceRegistryConfig() .setHttpVersion(getHttpVersion()) .setInstances(getInstances()) .setIpPort(getIpPort()) .setSsl(isSsl()) + .setOriginAddress(getOriginAddress()) .setClientName(RegistryHttpClientOptionsSPI.CLIENT_NAME) .setWatchClientName(RegistryWatchHttpClientOptionsSPI.CLIENT_NAME) .setConnectionTimeout(getConnectionTimeout()) @@ -110,11 +113,16 @@ public boolean isSsl() { return this.ssl; } + public List getOriginAddress() { + return originAddress; + } + public ArrayList getIpPort() { List uriList = Objects .requireNonNull(Deployment.getSystemBootStrapInfo(ServiceCenterDefaultDeploymentProvider.SYSTEM_KEY_SERVICE_CENTER), "no sc address found!") .getAccessURL(); + this.originAddress = uriList; ArrayList ipPortList = new ArrayList<>(); uriList.forEach(anUriList -> { try { diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ServiceRegistryAddressManager.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ServiceRegistryAddressManager.java index 4f25eafba44..69f11ed5cde 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ServiceRegistryAddressManager.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ServiceRegistryAddressManager.java @@ -18,11 +18,16 @@ package org.apache.servicecomb.serviceregistry.refresh; import java.net.URI; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.foundation.common.net.IpPort; +import org.apache.servicecomb.foundation.common.net.NetUtils; import org.apache.servicecomb.foundation.common.net.URIEndpointObject; import org.apache.servicecomb.http.client.common.AbstractAddressManager; +import org.apache.servicecomb.http.client.common.URLEndPoint; import org.apache.servicecomb.http.client.event.RefreshEndpointEvent; import com.google.common.eventbus.EventBus; @@ -31,8 +36,13 @@ public class ServiceRegistryAddressManager extends AbstractAddressManager { private static final String URI_PREFIX = "rest://"; - public ServiceRegistryAddressManager(List addresses, EventBus eventBus) { - super(addresses); + private static final String ZONE = "availableZone"; + + private static final String REGION = "region"; + + public ServiceRegistryAddressManager(List addresses, String ownRegion, String ownAvailableZone, + EventBus eventBus) { + super(addresses, ownRegion, ownAvailableZone); eventBus.register(this); } @@ -54,4 +64,35 @@ public IpPort transformIpPort(String address) { public void onRefreshEndpointEvent(RefreshEndpointEvent event) { refreshEndpoint(event, RefreshEndpointEvent.SERVICE_CENTER_NAME); } + + public void constructAffinityAddress(List addresses, String ownRegion, String ownAvailableZone) { + boolean isAffinityAddress = addresses.stream().anyMatch(addr -> addr.contains(ZONE) || addr.contains(REGION)); + if (!isAffinityAddress || (StringUtils.isEmpty(ownRegion) && StringUtils.isEmpty(ownAvailableZone))) { + return; + } + Set sameZone = new HashSet<>(); + Set sameRegion = new HashSet<>(); + for (String address : addresses) { + URI uri = URI.create(address); + String ipPort = NetUtils.parseIpPort(uri).toString(); + if (isMatchRegionAndZone(address, ownRegion, ownAvailableZone)) { + sameZone.add(ipPort); + } else { + sameRegion.add(ipPort); + } + } + refreshAffinityAddress(sameZone, sameRegion); + } + + private boolean isMatchRegionAndZone(String address, String ownRegion, String ownAvailableZone) { + try { + URLEndPoint endPoint = new URLEndPoint(address); + if (!StringUtils.equals(ownRegion, endPoint.getFirst(REGION))) { + return false; + } + return StringUtils.equals(ownAvailableZone, endPoint.getFirst(ZONE)); + } catch (Exception e) { + return false; + } + } } diff --git a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/client/http/MockAuthHeaderProvider.java b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/client/http/MockAuthHeaderProvider.java index ea210a6c197..64ae22092e9 100644 --- a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/client/http/MockAuthHeaderProvider.java +++ b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/client/http/MockAuthHeaderProvider.java @@ -24,7 +24,7 @@ public class MockAuthHeaderProvider implements AuthHeaderProvider { @Override - public Map authHeaders() { + public Map authHeaders(String host) { HashMap headers = new HashMap<>(); headers.put("X-Service-AK", "blah..."); return headers; diff --git a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/refresh/ServiceRegistryAddressManagerTest.java b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/refresh/ServiceRegistryAddressManagerTest.java index d6a181b4f48..ff8547c4d99 100644 --- a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/refresh/ServiceRegistryAddressManagerTest.java +++ b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/refresh/ServiceRegistryAddressManagerTest.java @@ -44,8 +44,8 @@ class ServiceRegistryAddressManagerTest { public void addressManagerTest() { IpPort ipPort = new IpPort("127.0.0.1", 30103); addresses.add(ipPort.toString()); - addressManager1 = new ServiceRegistryAddressManager(addresses, new EventBus()); - addressManager2 = new ServiceRegistryAddressManager(addresses, new EventBus()); + addressManager1 = new ServiceRegistryAddressManager(addresses, "", "", new EventBus()); + addressManager2 = new ServiceRegistryAddressManager(addresses, "", "", new EventBus()); Assertions.assertNotNull(addressManager1); Assertions.assertNotNull(addressManager2); @@ -70,7 +70,7 @@ public void onRefreshEndpointEvent() { Map> zoneAndRegion = new HashMap<>(); zoneAndRegion.put("sameZone", addressAZ); zoneAndRegion.put("sameRegion", addressRG); - addressManager1 = new ServiceRegistryAddressManager(addresses, new EventBus()); + addressManager1 = new ServiceRegistryAddressManager(addresses, "", "", new EventBus()); RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "SERVICECENTER"); addressManager1.refreshEndpoint(event, "SERVICECENTER"); @@ -88,7 +88,7 @@ public void addressIPV6Test() { Map> zoneAndRegion = new HashMap<>(); zoneAndRegion.put("sameZone", addressAZ); zoneAndRegion.put("sameRegion", new ArrayList<>()); - addressManager1 = new ServiceRegistryAddressManager(addresses, EventManager.getEventBus()); + addressManager1 = new ServiceRegistryAddressManager(addresses, "", "", EventManager.getEventBus()); RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "SERVICECENTER"); addressManager1.refreshEndpoint(event, "SERVICECENTER");