2929import java .util .concurrent .ExecutorService ;
3030import java .util .concurrent .LinkedBlockingQueue ;
3131import java .util .concurrent .ScheduledExecutorService ;
32+ import java .util .concurrent .ScheduledFuture ;
3233import java .util .concurrent .ScheduledThreadPoolExecutor ;
3334import java .util .concurrent .ThreadPoolExecutor ;
3435import java .util .concurrent .TimeUnit ;
7879import com .cloud .agent .transport .Request ;
7980import com .cloud .agent .transport .Response ;
8081import com .cloud .alert .AlertManager ;
82+ import com .cloud .cluster .ManagementServerHostVO ;
83+ import com .cloud .cluster .dao .ManagementServerHostDao ;
8184import com .cloud .configuration .ManagementServiceConfiguration ;
8285import com .cloud .dc .ClusterVO ;
8386import com .cloud .dc .DataCenterVO ;
106109import com .cloud .utils .concurrency .NamedThreadFactory ;
107110import com .cloud .utils .db .DB ;
108111import com .cloud .utils .db .EntityManager ;
112+ import com .cloud .utils .db .Filter ;
109113import com .cloud .utils .db .QueryBuilder ;
110114import com .cloud .utils .db .SearchCriteria .Op ;
111115import com .cloud .utils .db .TransactionLegacy ;
@@ -163,6 +167,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
163167 protected ConfigurationDao _configDao = null ;
164168 @ Inject
165169 protected ClusterDao _clusterDao = null ;
170+ @ Inject
171+ protected ManagementServerHostDao _msHostDao ;
166172
167173 @ Inject
168174 protected HighAvailabilityManager _haMgr = null ;
@@ -184,12 +190,16 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
184190 protected ScheduledExecutorService _directAgentExecutor ;
185191 protected ScheduledExecutorService _cronJobExecutor ;
186192 protected ScheduledExecutorService _monitorExecutor ;
193+ protected ScheduledExecutorService _scanHostsExecutor ;
194+ protected ScheduledExecutorService _investigatorExecutor ;
187195
188196 private int _directAgentThreadCap ;
189197
190198 protected StateMachine2 <Status , Status .Event , Host > _statusStateMachine = Status .getStateMachine ();
191199 private final ConcurrentHashMap <Long , Long > _pingMap = new ConcurrentHashMap <Long , Long >(10007 );
192200
201+ private final ConcurrentHashMap <Long , ScheduledFuture <?>> _investigateTasksMap = new ConcurrentHashMap <>();
202+
193203 @ Inject
194204 ResourceManager _resourceMgr ;
195205 @ Inject
@@ -209,6 +219,14 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
209219 protected final ConfigKey <Boolean > CheckTxnBeforeSending = new ConfigKey <Boolean >("Developer" , Boolean .class , "check.txn.before.sending.agent.commands" , "false" ,
210220 "This parameter allows developers to enable a check to see if a transaction wraps commands that are sent to the resource. This is not to be enabled on production systems." , true );
211221
222+ protected final ConfigKey <Boolean > InvestigateDisconnectedHosts = new ConfigKey <>("Advanced" , Boolean .class , "investigate.disconnected.hosts" ,
223+ "false" , "Determines whether to investigate VMs on disconnected hosts" , false );
224+ protected final ConfigKey <Integer > InvestigateDisconnectedHostsInterval = new ConfigKey <>("Advanced" , Integer .class , "investigate.disconnected.hosts.interval" ,
225+ "300" , "The time (in seconds) between VM investigation on disconnected hosts." , false );
226+ protected final ConfigKey <Integer > InvestigateDisconnectedHostsPoolSize = new ConfigKey <Integer >("Advanced" , Integer .class , "investigate.disconnected.hosts.pool.size" , "10" ,
227+ "Default pool size to investigate disconnected hosts" , false );
228+
229+
212230 @ Override
213231 public boolean configure (final String name , final Map <String , Object > params ) throws ConfigurationException {
214232
@@ -244,6 +262,9 @@ public boolean configure(final String name, final Map<String, Object> params) th
244262
245263 _monitorExecutor = new ScheduledThreadPoolExecutor (1 , new NamedThreadFactory ("AgentMonitor" ));
246264
265+ _scanHostsExecutor = new ScheduledThreadPoolExecutor (1 , new NamedThreadFactory ("HostsScanner" ));
266+ _investigatorExecutor = new ScheduledThreadPoolExecutor (InvestigateDisconnectedHostsPoolSize .value (), new NamedThreadFactory ("DisconnectHostsInvestigator" ));
267+
247268 return true ;
248269 }
249270
@@ -620,6 +641,10 @@ public boolean start() {
620641
621642 _monitorExecutor .scheduleWithFixedDelay (new MonitorTask (), mgmtServiceConf .getPingInterval (), mgmtServiceConf .getPingInterval (), TimeUnit .SECONDS );
622643
644+ if (InvestigateDisconnectedHosts .value ()) {
645+ _scanHostsExecutor .scheduleAtFixedRate (new ScanDisconnectedHostsTask (), 60 , 60 , TimeUnit .SECONDS );
646+ }
647+
623648 return true ;
624649 }
625650
@@ -779,6 +804,8 @@ public boolean stop() {
779804
780805 _connectExecutor .shutdownNow ();
781806 _monitorExecutor .shutdownNow ();
807+ _investigatorExecutor .shutdownNow ();
808+
782809 return true ;
783810 }
784811
@@ -1731,7 +1758,8 @@ public String getConfigComponentName() {
17311758 @ Override
17321759 public ConfigKey <?>[] getConfigKeys () {
17331760 return new ConfigKey <?>[] { CheckTxnBeforeSending , Workers , Port , Wait , AlertWait , DirectAgentLoadSize , DirectAgentPoolSize ,
1734- DirectAgentThreadCap };
1761+ DirectAgentThreadCap ,
1762+ InvestigateDisconnectedHosts , InvestigateDisconnectedHostsInterval , InvestigateDisconnectedHostsPoolSize };
17351763 }
17361764
17371765 protected class SetHostParamsListener implements Listener {
@@ -1850,4 +1878,91 @@ public void propagateChangeToAgents(Map<String, String> params) {
18501878 sendCommandToAgents (hostsPerZone , params );
18511879 }
18521880 }
1881+
1882+ protected class ScanDisconnectedHostsTask extends ManagedContextRunnable {
1883+
1884+ @ Override
1885+ protected void runInContext () {
1886+ try {
1887+ ManagementServerHostVO msHost = _msHostDao .findOneInUpState (new Filter (ManagementServerHostVO .class , "id" , true , 0L , 1L ));
1888+ if (msHost == null || (msHost .getMsid () != _nodeId )) {
1889+ s_logger .debug ("Skipping disconnected hosts scan task" );
1890+ for (Long hostId : _investigateTasksMap .keySet ()) {
1891+ cancelInvestigationTask (hostId );
1892+ }
1893+ return ;
1894+ }
1895+ for (HostVO host : _hostDao .listByType (Host .Type .Routing )) {
1896+ if (host .getStatus () == Status .Disconnected ) {
1897+ scheduleInvestigationTask (host .getId ());
1898+ }
1899+ }
1900+ } catch (final Exception e ) {
1901+ s_logger .error ("Exception caught while scanning disconnected hosts : " , e );
1902+ }
1903+ }
1904+ }
1905+
1906+ protected class InvestigationTask extends ManagedContextRunnable {
1907+ Long _hostId ;
1908+
1909+ InvestigationTask (final Long hostId ) {
1910+ _hostId = hostId ;
1911+ }
1912+
1913+ @ Override
1914+ protected void runInContext () {
1915+ try {
1916+ final long hostId = _hostId ;
1917+ s_logger .info ("Investigating host " + hostId + " to determine its actual state" );
1918+ HostVO host = _hostDao .findById (hostId );
1919+ if (host == null ) {
1920+ s_logger .info ("Cancelling investigation on host " + hostId + " which might has been removed" );
1921+ cancelInvestigationTask (hostId );
1922+ return ;
1923+ }
1924+ if (host .getStatus () != Status .Disconnected ) {
1925+ s_logger .info ("Cancelling investigation on host " + hostId + " in status " + host .getStatus ());
1926+ cancelInvestigationTask (hostId );
1927+ return ;
1928+ }
1929+ Status determinedState = _haMgr .investigate (hostId );
1930+ s_logger .info ("Investigators determine the status of host " + hostId + " is " + determinedState );
1931+ if (determinedState == Status .Down ) {
1932+ agentStatusTransitTo (host , Status .Event .HostDown , _nodeId );
1933+ s_logger .info ("Scheduling VMs restart on host " + hostId + " which is Down" );
1934+ _haMgr .scheduleRestartForVmsOnHost (host , true );
1935+ s_logger .info ("Cancelling investigation on host " + hostId + " which is Down" );
1936+ cancelInvestigationTask (hostId );
1937+ }
1938+ } catch (final Exception e ) {
1939+ s_logger .error ("Exception caught while handling investigation task: " , e );
1940+ }
1941+ }
1942+ }
1943+
1944+ private void scheduleInvestigationTask (final Long hostId ) {
1945+ ScheduledFuture future = _investigateTasksMap .get (hostId );
1946+ if (future != null ) {
1947+ s_logger .info ("There is already a task to investigate host " + hostId );
1948+ } else {
1949+ ScheduledFuture scheduledFuture = _investigatorExecutor .scheduleWithFixedDelay (new InvestigationTask (hostId ), InvestigateDisconnectedHostsInterval .value (),
1950+ InvestigateDisconnectedHostsInterval .value (), TimeUnit .SECONDS );
1951+ _investigateTasksMap .put (hostId , scheduledFuture );
1952+ s_logger .info ("Scheduled a task to investigate host " + hostId );
1953+ }
1954+ }
1955+
1956+ private void cancelInvestigationTask (final Long hostId ) {
1957+ ScheduledFuture future = _investigateTasksMap .get (hostId );
1958+ if (future != null ) {
1959+ try {
1960+ future .cancel (false );
1961+ s_logger .info ("Cancelled a task to investigate host " + hostId );
1962+ _investigateTasksMap .remove (hostId );
1963+ } catch (Exception e ) {
1964+ s_logger .error ("Exception caught while cancelling investigation task: " , e );
1965+ }
1966+ }
1967+ }
18531968}
0 commit comments