Skip to content

Commit b6acc4e

Browse files
committed
优化 客户端 通知 负债均衡
1 parent a69d37d commit b6acc4e

File tree

5 files changed

+50
-16
lines changed

5 files changed

+50
-16
lines changed

job-core/src/main/java/com/lts/job/core/remoting/HeartBeatMonitor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.lts.job.core.remoting;
22

3+
import com.lts.job.core.cluster.NodeType;
34
import com.lts.job.core.exception.JobTrackerNotFoundException;
5+
import com.lts.job.core.support.Application;
46
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
68

@@ -26,8 +28,13 @@ public HeartBeatMonitor(RemotingClientDelegate remotingClient) {
2628
}
2729

2830
public void start() {
31+
// 设置 JobClient 心跳10s一次,TaskTracker 5s一次
32+
int delay = 5;
33+
if(NodeType.CLIENT.equals(Application.Config.getNodeType())){
34+
delay = 10;
35+
}
2936
HEART_BEAT_EXECUTOR_SERVICE.scheduleWithFixedDelay(
30-
new HeartBeatRunner(), 5, 5, TimeUnit.SECONDS); // 5s发送一次心跳
37+
new HeartBeatRunner(), 5, delay, TimeUnit.SECONDS);
3138
}
3239

3340
public void destroy() {

job-core/src/main/java/com/lts/job/core/repository/JobFeedbackQueueMongoRepository.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ public void delJobFeedback(String id) {
1616
ds.delete(query);
1717
}
1818

19+
public long count(){
20+
Query<JobFeedbackQueuePo> query = createQuery();
21+
return ds.getCount(query);
22+
}
23+
1924
public List<JobFeedbackQueuePo> get(int offset, int limit){
2025
Query<JobFeedbackQueuePo> query = createQuery();
2126
query.order("gmtCreated").offset(offset).limit(limit);

job-core/target/maven-archiver/pom.properties

Lines changed: 0 additions & 5 deletions
This file was deleted.

job-tracker/src/main/java/com/lts/job/tracker/support/FeedbackJobSendChecker.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ private class Runner implements Runnable {
8383
@Override
8484
public void run() {
8585
try {
86+
87+
long count = jobFeedbackQueueMongoRepository.count();
88+
if (count == 0) {
89+
return;
90+
}
91+
LOGGER.info("一共有{}个完成的任务要通知客户端.", count);
92+
8693
List<JobFeedbackQueuePo> jobFeedbackQueuePos;
8794
int offset = 0;
8895
int limit = 5;
@@ -91,7 +98,6 @@ public void run() {
9198
if (CollectionUtils.isEmpty(jobFeedbackQueuePos)) {
9299
return;
93100
}
94-
LOGGER.info("一共有{}个完成的任务要通知客户端.", jobFeedbackQueuePos.size());
95101
List<JobResult> jobResults = new ArrayList<JobResult>(jobFeedbackQueuePos.size());
96102
for (JobFeedbackQueuePo jobFeedbackQueuePo : jobFeedbackQueuePos) {
97103
jobResults.add(jobFeedbackQueuePo);

job-tracker/src/main/java/com/lts/job/tracker/support/JobClientManager.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99
import org.slf4j.Logger;
1010
import org.slf4j.LoggerFactory;
1111

12+
import java.util.ArrayList;
13+
import java.util.List;
1214
import java.util.concurrent.ConcurrentHashMap;
15+
import java.util.concurrent.ThreadLocalRandom;
1316

1417
/**
1518
* @author Robert HG (254963746@qq.com) on 8/17/14.
16-
* 客户端节点管理
19+
* 客户端节点管理
1720
*/
1821
public class JobClientManager {
1922

@@ -36,7 +39,7 @@ public void addNode(Node node) {
3639
ChannelWrapper channel = ChannelManager.getChannel(node.getGroup(), node.getNodeType(), node.getIdentity());
3740
ConcurrentHashSet<JobClientNode> jobClientNodes = NODE_MAP.get(node.getGroup());
3841

39-
synchronized (NODE_MAP){
42+
synchronized (NODE_MAP) {
4043
if (jobClientNodes == null) {
4144
jobClientNodes = new ConcurrentHashSet<JobClientNode>();
4245
NODE_MAP.put(node.getGroup(), jobClientNodes);
@@ -58,7 +61,7 @@ public void removeNode(Node node) {
5861
ConcurrentHashSet<JobClientNode> jobClientNodes = NODE_MAP.get(node.getGroup());
5962
if (jobClientNodes != null && jobClientNodes.size() != 0) {
6063
for (JobClientNode jobClientNode : jobClientNodes) {
61-
if(node.getIdentity().equals(jobClientNode.getIdentity())){
64+
if (node.getIdentity().equals(jobClientNode.getIdentity())) {
6265
LOGGER.info("删除JobClient节点:{}", jobClientNode);
6366
jobClientNodes.remove(jobClientNode);
6467
}
@@ -68,31 +71,49 @@ public void removeNode(Node node) {
6871

6972
/**
7073
* 得到 可用的 客户端节点
74+
*
7175
* @param nodeGroup
7276
* @return
7377
*/
7478
public JobClientNode getAvailableJobClient(String nodeGroup) {
7579

7680
ConcurrentHashSet<JobClientNode> jobClientNodes = NODE_MAP.get(nodeGroup);
81+
7782
if (jobClientNodes == null || jobClientNodes.size() == 0) {
7883
return null;
7984
}
8085

81-
for (JobClientNode jobClientNode : jobClientNodes) {
86+
int size = jobClientNodes.size();
87+
int index = getRandomIndex(size);
88+
89+
List<JobClientNode> list = new ArrayList<JobClientNode>(jobClientNodes);
90+
91+
JobClientNode jobClientNode = null;
92+
int retry = 0;
93+
while (jobClientNode == null && retry < size) {
94+
jobClientNode = list.get(index);
8295
// 如果 channel 已经关闭, 更新channel, 如果没有channel, 略过
83-
if (jobClientNode.getChannel() == null || jobClientNode.getChannel().isClosed()) {
96+
if (jobClientNode != null && (jobClientNode.getChannel() == null || jobClientNode.getChannel().isClosed())) {
8497
ChannelWrapper channel = ChannelManager.getChannel(jobClientNode.getNodeGroup(), NodeType.CLIENT, jobClientNode.getIdentity());
8598
if (channel != null) {
8699
// 更新channel
87100
jobClientNode.setChannel(channel);
88101
} else {
89-
continue;
102+
jobClientNode = null;
90103
}
91104
}
92-
93-
return jobClientNode;
105+
index = (index + 1) % size;
106+
retry++;
94107
}
95108

96-
return null;
109+
return jobClientNode;
110+
}
111+
112+
private int getRandomIndex(int size) {
113+
int min = 1;
114+
int max = size;
115+
ThreadLocalRandom random = ThreadLocalRandom.current();
116+
return random.nextInt(max) % (max - min + 1) + min - 1;
97117
}
118+
98119
}

0 commit comments

Comments
 (0)