Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,58 @@
public class ResourceDestinationMakerImpl implements ManagementNodeChangeListener, ResourceDestinationMaker {
private final ConsistentHash<String> nodeHash = new ConsistentHash<>(new ApacheHash(), 500, new ArrayList<String>()) ;
private final Map<String, NodeInfo> nodes = new HashMap<>();
private final Object lock = new Object();

@Autowired
private DatabaseFacade dbf;

@Override
public void nodeJoin(ManagementNodeInventory inv) {
nodeHash.add(inv.getUuid());
nodes.put(inv.getUuid(), new NodeInfo(inv));
synchronized (lock) {
nodeHash.add(inv.getUuid());
nodes.put(inv.getUuid(), new NodeInfo(inv));
}
}

@Override
public void nodeLeft(ManagementNodeInventory inv) {
String nodeId = inv.getUuid();
nodeHash.remove(nodeId);
nodes.remove(nodeId);
synchronized (lock) {
String nodeId = inv.getUuid();
nodeHash.remove(nodeId);
nodes.remove(nodeId);
}
}

@Override
public void iAmDead(ManagementNodeInventory inv) {
String nodeId = inv.getUuid();
nodeHash.remove(nodeId);
nodes.remove(nodeId);
synchronized (lock) {
String nodeId = inv.getUuid();
nodeHash.remove(nodeId);
nodes.remove(nodeId);
}
}

@Override
public void iJoin(ManagementNodeInventory inv) {
List<ManagementNodeVO> lst = Q.New(ManagementNodeVO.class).list();
lst.forEach((ManagementNodeVO node) -> {
nodeHash.add(node.getUuid());
nodes.put(node.getUuid(), new NodeInfo(node));
});
synchronized (lock) {
List<ManagementNodeVO> lst = Q.New(ManagementNodeVO.class).list();
lst.forEach((ManagementNodeVO node) -> {
nodeHash.add(node.getUuid());
nodes.put(node.getUuid(), new NodeInfo(node));
});
}
}

@Override
public String makeDestination(String resourceUuid) {
String nodeUuid = nodeHash.get(resourceUuid);
if (nodeUuid == null) {
throw new CloudRuntimeException("Cannot find any available management node to send message");
}
synchronized (lock) {
String nodeUuid = nodeHash.get(resourceUuid);
if (nodeUuid == null) {
throw new CloudRuntimeException("Cannot find any available management node to send message");
}

return nodeUuid;
return nodeUuid;
}
}

@Override
Expand All @@ -73,37 +84,48 @@ public boolean isManagedByUs(String resourceUuid) {

@Override
public Collection<String> getManagementNodesInHashRing() {
return nodeHash.getNodes();
synchronized (lock) {
return new ArrayList<>(nodeHash.getNodes());
}
}

@Override
public NodeInfo getNodeInfo(String nodeUuid) {
NodeInfo info = nodes.get(nodeUuid);
if (info == null) {
ManagementNodeVO vo = dbf.findByUuid(nodeUuid, ManagementNodeVO.class);
if (vo == null) {
throw new ManagementNodeNotFoundException(nodeUuid);
synchronized (lock) {
NodeInfo info = nodes.get(nodeUuid);
if (info == null) {
ManagementNodeVO vo = dbf.findByUuid(nodeUuid, ManagementNodeVO.class);
if (vo == null) {
throw new ManagementNodeNotFoundException(nodeUuid);
}

nodeHash.add(nodeUuid);
info = new NodeInfo(vo);
nodes.put(nodeUuid, info);
}

nodeHash.add(nodeUuid);
info = nodes.put(nodeUuid, new NodeInfo(vo));
return info;
}

return info;
}

@Override
public Collection<NodeInfo> getAllNodeInfo() {
return nodes.values();
synchronized (lock) {
return new ArrayList<>(nodes.values());
}
}

@Override
public int getManagementNodeCount() {
return nodes.values().size();
synchronized (lock) {
return nodes.size();
}
}


public boolean isNodeInCircle(String nodeId) {
return nodeHash.hasNode(nodeId);
synchronized (lock) {
return nodeHash.hasNode(nodeId);
}
}
}