diff --git a/demo/demo-cse-v1/consumer/src/main/resources/application.yml b/demo/demo-cse-v1/consumer/src/main/resources/application.yml index 2b50e1a670c..7f52f380b9b 100644 --- a/demo/demo-cse-v1/consumer/src/main/resources/application.yml +++ b/demo/demo-cse-v1/consumer/src/main/resources/application.yml @@ -36,6 +36,12 @@ servicecomb: rest: address: 0.0.0.0:9092 # should be same with server.port to use web container + loadbalance: + filter: + service: + warmup: + enabled: true + routeRule: provider: | - precedence: 1 diff --git a/demo/demo-cse-v1/gateway/src/main/resources/application.yml b/demo/demo-cse-v1/gateway/src/main/resources/application.yml index 9f8b8c0b8ef..d0a6edd158b 100644 --- a/demo/demo-cse-v1/gateway/src/main/resources/application.yml +++ b/demo/demo-cse-v1/gateway/src/main/resources/application.yml @@ -35,6 +35,12 @@ servicecomb: rest: address: 0.0.0.0:9090?sslEnabled=false + loadbalance: + filter: + service: + warmup: + enabled: true + http: dispatcher: edge: diff --git a/demo/demo-cse-v1/provider-canary/src/main/resources/application.yml b/demo/demo-cse-v1/provider-canary/src/main/resources/application.yml index 81bf7aaffb7..2b9bb6f6270 100644 --- a/demo/demo-cse-v1/provider-canary/src/main/resources/application.yml +++ b/demo/demo-cse-v1/provider-canary/src/main/resources/application.yml @@ -19,6 +19,10 @@ servicecomb-config-order: 10 servicecomb: + instance: + properties: + warmupTime: 60000 + warmupCurve: 2 service: application: demo-java-chassis-cse-v1 name: provider diff --git a/demo/demo-cse-v1/provider/src/main/resources/application.yml b/demo/demo-cse-v1/provider/src/main/resources/application.yml index 370c3cd975e..10e0ebd8580 100644 --- a/demo/demo-cse-v1/provider/src/main/resources/application.yml +++ b/demo/demo-cse-v1/provider/src/main/resources/application.yml @@ -19,6 +19,10 @@ servicecomb-config-order: 10 servicecomb: + instance: + properties: + warmupTime: 30000 + warmupCurve: 2 service: application: demo-java-chassis-cse-v1 name: provider diff --git a/demo/demo-cse-v1/test-client/src/main/java/org/apache/servicecomb/samples/HelloWorldIT.java b/demo/demo-cse-v1/test-client/src/main/java/org/apache/servicecomb/samples/HelloWorldIT.java index e0f4d3d6610..b0585a908ef 100644 --- a/demo/demo-cse-v1/test-client/src/main/java/org/apache/servicecomb/samples/HelloWorldIT.java +++ b/demo/demo-cse-v1/test-client/src/main/java/org/apache/servicecomb/samples/HelloWorldIT.java @@ -41,6 +41,7 @@ public void testRestTransport() throws Exception { testHelloWorldeEptyProtectionCloseWeightLess100(); testHelloWorldEmptyProtectionCloseFallback(); testHelloWorldEmptyProtectionCloseWeight100Two(); + testHelloWorldWarmUp(); } private void testHelloWorld() { @@ -200,4 +201,25 @@ private void testHelloWorldEmptyProtectionCloseWeight100Two() { TestMgr.check(failCount == succCount, true); } + + private void testHelloWorldWarmUp() throws InterruptedException { + int oldCount = 0; + int newCount = 0; + + for (int i = 0; i < 100; i++) { + String result = template + .getForObject(Config.GATEWAY_URL + "/sayHello?name=World", String.class); + Thread.sleep(1000); + if (result.equals("\"Hello World\"")) { + oldCount++; + } else if (result.equals("\"Hello in canary World\"")) { + newCount++; + } else { + TestMgr.fail("not expected result testHelloWorldCanary"); + return; + } + } + + TestMgr.check(oldCount > newCount, true); + } } diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ServerListFilterExt.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ServerListFilterExt.java index 002bb3a92a7..124e8f13563 100644 --- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ServerListFilterExt.java +++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ServerListFilterExt.java @@ -34,12 +34,16 @@ public interface ServerListFilterExt { int ORDER_ZONE_AWARE = 200; + int ORDER_WARM_UP = Integer.MAX_VALUE; + String EMPTY_INSTANCE_PROTECTION = "servicecomb.loadbalance.filter.isolation.emptyInstanceProtectionEnabled"; String ISOLATION_FILTER_ENABLED = "servicecomb.loadbalance.filter.isolation.enabled"; String ZONE_AWARE_FILTER_ENABLED = "servicecomb.loadbalance.filter.zoneaware.enabled"; + String WARM_UP_FILTER_ENABLED = "servicecomb.loadbalance.filter.service.warmup.enabled"; + default int getOrder() { return ORDER_NORMAL; } diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/filterext/WarmUpDiscoveryFilter.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/filterext/WarmUpDiscoveryFilter.java new file mode 100644 index 00000000000..54c40d95f18 --- /dev/null +++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/filterext/WarmUpDiscoveryFilter.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.loadbalance.filterext; + +import com.netflix.config.DynamicPropertyFactory; + +import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.loadbalance.ServerListFilterExt; +import org.apache.servicecomb.loadbalance.ServiceCombServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +public class WarmUpDiscoveryFilter implements ServerListFilterExt { + private static final Logger LOGGER = LoggerFactory.getLogger(WarmUpDiscoveryFilter.class); + + private static final int INSTANCE_WEIGHT = 100; + + // Default time for warm up, the unit is milliseconds + private static final long DEFAULT_WARM_UP_TIME = 30000L; + + // Preheat calculates curve value + private static final int DEFAULT_WARM_UP_CURVE = 2; + + private static final int DEFAULT_WARM_UP_MAX_CALL = 50; + + private static final String WARM_UP_TIME = "servicecomb.loadbalance.filter.service.warmup.warm-up-time"; + + private static final String WARM_UP_CURVE = "servicecomb.loadbalance.filter.service.warmup.warm-up-curve"; + + // Maximum requests for instance that need warm up during the warm-up period + private static final String WARM_UP_MAX_CALL = "servicecomb.loadbalance.filter.service.warmup.warm-up-max-call"; + + // Maximum requests for instance that have been warmed up during the warm-up period + private static final String WARMED_MAX_CALL = "servicecomb.loadbalance.filter.service.warmup.warmed-max-call"; + + private final Random random = new Random(); + + private final Map instanceInvokeTime = new ConcurrentHashMap<>(); + + private final Map> instanceRequestTimestamps = new ConcurrentHashMap<>(); + + private final Executor warmUpExecutor = Executors.newSingleThreadExecutor((r) -> { + Thread thread = new Thread(r); + thread.setName("warm-up-cache-refresh"); + return thread; + }); + + private long refreshFlagTime = System.currentTimeMillis(); + + // 10 minutes + private static final long REFRESH_INTERVAL_TIME = 10 * 60 * 1000; + + private void refreshMapCache() { + List removeKeys = new ArrayList<>(); + for (Map.Entry entry : instanceInvokeTime.entrySet()) { + String[] ipAndPort = entry.getKey().split("@"); + if (!checkProviderExist(ipAndPort[0], Integer.parseInt(ipAndPort[1]))) { + removeKeys.add(entry.getKey()); + } + } + if (CollectionUtils.isEmpty(removeKeys)) { + return; + } + removeKeys.forEach(instanceInvokeTime::remove); + } + + @Override + public int getOrder() { + return ORDER_WARM_UP; + } + + @Override + public boolean enabled() { + return DynamicPropertyFactory.getInstance() + .getBooleanProperty(WARM_UP_FILTER_ENABLED, false) + .get(); + } + + @Override + public List getFilteredListOfServers(List servers, + Invocation invocation) { + if (servers.size() <= 1) { + return servers; + } + List notInvokedInstances = new ArrayList<>(); + List needWarmUpInstances = getNeedWarmUpInstances(servers, notInvokedInstances); + if (CollectionUtils.isEmpty(needWarmUpInstances)) { + return servers; + } + + // Refresh every 10 minutes + if (System.currentTimeMillis() - refreshFlagTime >= REFRESH_INTERVAL_TIME) { + warmUpExecutor.execute(this::refreshMapCache); + refreshFlagTime = System.currentTimeMillis(); + } + + // If exist not invoked instances, choose one that let it into warm up. + if (!CollectionUtils.isEmpty(notInvokedInstances)) { + setInstanceStartInvokeTime(notInvokedInstances.get(0)); + setInstanceRequestTimestamps(notInvokedInstances.get(0)); + return Collections.singletonList(notInvokedInstances.get(0)); + } + + // check instances allow request + List callableServers = getCallableServers(servers, needWarmUpInstances); + if (callableServers.size() == 1) { + setInstanceRequestTimestamps(callableServers.get(0)); + return callableServers; + } + if (CollectionUtils.isEmpty(callableServers)) { + callableServers = servers; + } + int[] weights = new int[servers.size()]; + int totalWeight = 0; + int index = 0; + for (ServiceCombServer server : callableServers) { + weights[index] = calculateWeight(server.getInstance().getInstanceId()); + totalWeight += weights[index++]; + } + return chooseServer(totalWeight, weights, servers); + } + + private List getNeedWarmUpInstances(List servers, + List notInvokedInstances) { + return servers.stream() + .filter(server -> isInstanceNeedWarmUp(server, notInvokedInstances)) + .collect(Collectors.toList()); + } + + private boolean isInstanceNeedWarmUp(ServiceCombServer server, List notInvokedInstances) { + Long invokeTime = instanceInvokeTime.get(buildCacheKey(server)); + + // instance have not been invoked need to be warn-up + if (invokeTime == null) { + notInvokedInstances.add(server); + return true; + } + return System.currentTimeMillis() - invokeTime < getWarmUpTime(); + } + + private List chooseServer(int totalWeight, int[] weights, List servers) { + if (totalWeight <= 0) { + return servers; + } + int position = random.nextInt(totalWeight); + for (int i = 0; i < weights.length; i++) { + position -= weights[i]; + if (position < 0) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("warm up choose service instance: " + servers.get(i).getInstance().getInstanceId()); + } + setInstanceRequestTimestamps(servers.get(i)); + return Collections.singletonList(servers.get(i)); + } + } + return servers; + } + + private void setInstanceStartInvokeTime(ServiceCombServer server) { + instanceInvokeTime.putIfAbsent(buildCacheKey(server), System.currentTimeMillis()); + } + + private String buildCacheKey(ServiceCombServer server) { + return server.getInstance().getHostName() + "@" + server.getPort(); + } + + private int calculateWeight(String instanceId) { + long warmUpTime = getWarmUpTime(); + if (warmUpTime <= 0 || instanceInvokeTime.get(instanceId) == null) { + return INSTANCE_WEIGHT; + } + long invokeStartTime = instanceInvokeTime.get(instanceId); + + // calculated in milliseconds + final long runTime = System.currentTimeMillis() - invokeStartTime; + if (runTime > 0 && runTime < warmUpTime) { + return calculateWarmUpWeight(runTime, warmUpTime); + } + return INSTANCE_WEIGHT; + } + + private int calculateWarmUpWeight(double runtime, double warmUpTime) { + int warmUpCurve = getWarmUpCurve(); + final int round = (int) Math.round(Math.pow(runtime / warmUpTime, warmUpCurve) * INSTANCE_WEIGHT); + return round < 1 ? 1 : Math.min(round, INSTANCE_WEIGHT); + } + + private boolean checkProviderExist(String host, int port) { + try (Socket s = new Socket()) { + s.connect(new InetSocketAddress(host, port), 3000); + return true; + } catch (IOException e) { + return false; + } + } + + public boolean allowRequest(Map requestTimestamps, int maxRequests) { + // if maxRequests less or equal 0 that mean not need limit + if (maxRequests <= 0) { + return true; + } + long currentTime = System.currentTimeMillis(); + long warmUpTime = getWarmUpTime(); + requestTimestamps.entrySet().removeIf(entry -> currentTime - entry.getKey() > warmUpTime); + return requestTimestamps.size() <= maxRequests; + } + + private List getCallableServers(List servers, + List needWarmUpInstances) { + List instanceIds = new ArrayList<>(); + List result = new ArrayList<>(); + + // check need warm up instance current requests. + int warmUpMaxCall = getWarmUpMaxCall(); + needWarmUpInstances.forEach(server -> { + String instanceId = server.getInstance().getInstanceId(); + instanceIds.add(instanceId); + if (allowRequest(instanceRequestTimestamps.get(instanceId), warmUpMaxCall)) { + result.add(server); + } + }); + int warmedMaxCall = getWarmedMaxCall(warmUpMaxCall * 5); + + // check warmed instances requests + servers.forEach(server -> { + String instanceId = server.getInstance().getInstanceId(); + if (!instanceIds.contains(instanceId)) { + if (allowRequest(instanceRequestTimestamps.get(instanceId), warmedMaxCall)) { + result.add(server); + } + } + }); + return result; + } + + private void setInstanceRequestTimestamps(ServiceCombServer server) { + String instanceId = server.getInstance().getInstanceId(); + if (instanceRequestTimestamps.get(instanceId) == null) { + instanceRequestTimestamps.put(instanceId, new ConcurrentHashMap<>()); + } + instanceRequestTimestamps.get(instanceId).put(System.currentTimeMillis(), 1); + } + + private long getWarmUpTime() { + return DynamicPropertyFactory.getInstance() + .getLongProperty(WARM_UP_TIME, DEFAULT_WARM_UP_TIME) + .get(); + } + + private int getWarmUpCurve() { + return DynamicPropertyFactory.getInstance() + .getIntProperty(WARM_UP_CURVE, DEFAULT_WARM_UP_CURVE) + .get(); + } + + private int getWarmUpMaxCall() { + return DynamicPropertyFactory.getInstance() + .getIntProperty(WARM_UP_MAX_CALL, DEFAULT_WARM_UP_MAX_CALL) + .get(); + } + + private int getWarmedMaxCall(int defaultValue) { + return DynamicPropertyFactory.getInstance() + .getIntProperty(WARMED_MAX_CALL, defaultValue) + .get(); + } +} diff --git a/handlers/handler-loadbalance/src/main/resources/META-INF/services/org.apache.servicecomb.loadbalance.ServerListFilterExt b/handlers/handler-loadbalance/src/main/resources/META-INF/services/org.apache.servicecomb.loadbalance.ServerListFilterExt index a355341ff46..53676e8b1f8 100644 --- a/handlers/handler-loadbalance/src/main/resources/META-INF/services/org.apache.servicecomb.loadbalance.ServerListFilterExt +++ b/handlers/handler-loadbalance/src/main/resources/META-INF/services/org.apache.servicecomb.loadbalance.ServerListFilterExt @@ -16,4 +16,5 @@ # org.apache.servicecomb.loadbalance.filterext.IsolationServerListFilterExt -org.apache.servicecomb.loadbalance.filterext.ZoneAwareDiscoveryFilter \ No newline at end of file +org.apache.servicecomb.loadbalance.filterext.ZoneAwareDiscoveryFilter +org.apache.servicecomb.loadbalance.filterext.WarmUpDiscoveryFilter \ No newline at end of file