Skip to content

Commit 3fe8af5

Browse files
committed
Merge pull request #33 from qq254963746/develop
LTS admin 日志查询, 各种任务列表展示 和 查询 mongo实现,修复 ping bug, 推荐使用mongo做任务队列
2 parents 632c3d4 + 2a96e7e commit 3fe8af5

File tree

28 files changed

+306
-103
lines changed

28 files changed

+306
-103
lines changed

lts-admin/src/main/java/com/lts/job/web/controller/api/JobQueueApiController.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
import com.lts.job.biz.logger.domain.JobLogPo;
44
import com.lts.job.biz.logger.domain.JobLoggerRequest;
5-
import com.lts.job.core.commons.utils.*;
5+
import com.lts.job.core.commons.utils.Assert;
6+
import com.lts.job.core.commons.utils.DateUtils;
7+
import com.lts.job.core.commons.utils.StringUtils;
68
import com.lts.job.core.domain.JobQueueRequest;
79
import com.lts.job.core.domain.PageResponse;
810
import com.lts.job.core.support.CronExpression;
911
import com.lts.job.queue.domain.JobPo;
10-
import com.lts.job.queue.exception.DuplicateJobException;
1112
import com.lts.job.web.cluster.AdminApplication;
1213
import com.lts.job.web.controller.AbstractController;
1314
import com.lts.job.web.vo.RestfulResponse;

lts-admin/src/main/resources/lts-admin-config.properties

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@ registry.address=zookeeper://127.0.0.1:2181
55
clusterNames=test_cluster
66

77
#config. 开头的配置都将会配置进 Config 中
8-
config.job.queue=mysql
9-
config.job.logger=mysql
10-
config.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
11-
config.jdbc.username=root
12-
config.jdbc.password=root
8+
#config.job.queue=mysql
9+
#config.job.logger=mysql
10+
#config.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
11+
#config.jdbc.username=root
12+
#config.jdbc.password=root
13+
14+
config.job.queue=mongo
15+
config.job.logger=mongo
16+
mongo.addresses=127.0.0.1:27017
17+
mongo.database=lts
1318

1419
#config.zk.client=zkclient
1520
#config.zk.client=curator

lts-admin/src/main/webapp/main/cron-job-queue.jsp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@
166166
{
167167
title: '操作', dataIndex: '', width: 90, sortable: false, renderer: function (value, obj) {
168168
var logUrl = "/job-logger/job-logger.htm?taskId=" + obj.taskId + "&taskTrackerNodeGroup=" + obj.taskTrackerNodeGroup;
169-
return '<a target="_blank" href="'+ logUrl +'" class="job-logger-btn" taskId="' + obj.taskId + '" taskTrackerNodeGroup="' + obj.taskTrackerNodeGroup + '">日志</a>&nbsp;' +
169+
return '<a target="_blank" href="'+ logUrl +'">日志</a>&nbsp;' +
170170
'<a href="javascript:;" class="job-edit-btn">编辑<span class="hidden">' + JSON.stringify(obj) + '</span></a>&nbsp;' +
171171
'<a href="javascript:;" class="job-del-btn" jobId="' + obj.jobId + '">删除</a>';
172172
}

lts-admin/src/main/webapp/main/executable-job-queue.jsp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,7 @@
108108
{
109109
title: '操作', dataIndex: '', width: 90, sortable: false, renderer: function (value, obj) {
110110
var logUrl = "/job-logger/job-logger.htm?taskId=" + obj.taskId + "&taskTrackerNodeGroup=" + obj.taskTrackerNodeGroup;
111-
return '<a target="_blank" href="'+ logUrl +'" class="job-logger-btn" taskId="' + obj.taskId + '" taskTrackerNodeGroup="' + obj.taskTrackerNodeGroup + '">日志</a>&nbsp;' +
112-
'<a href="javascript:;" class="job-edit-btn">编辑<span class="hidden">' + JSON.stringify(obj) + '</span></a>&nbsp;' +
113-
'<a href="javascript:;" class="job-del-btn" jobId="' + obj.jobId + '">删除</a>';
111+
return '<a target="_blank" href="'+ logUrl +'">日志</a>';
114112
}
115113
}
116114
];

lts-admin/src/main/webapp/main/executing-job-queue.jsp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
{
113113
title: '操作', dataIndex: '', width: 40, sortable: false, renderer: function (value, obj) {
114114
var logUrl = "/job-logger/job-logger.htm?taskId=" + obj.taskId + "&taskTrackerNodeGroup=" + obj.taskTrackerNodeGroup;
115-
return '<a target="_blank" href="'+ logUrl +'" class="job-logger-btn" taskId="' + obj.taskId + '" taskTrackerNodeGroup="' + obj.taskTrackerNodeGroup + '">日志</a>&nbsp;';
115+
return '<a target="_blank" href="'+ logUrl +'">日志</a>&nbsp;';
116116
}
117117
}
118118
];

lts-admin/src/main/webapp/main/job-logger.jsp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@
9898
{title: '提交节点组', dataIndex: 'submitNodeGroup', sortable: false, width: 150},
9999
{
100100
title: '执行时间', dataIndex: 'triggerTime', sortable: false, width: 125, renderer: function (v) {
101-
return DateUtil.formatYMDHMD(v);
101+
if(v){
102+
return DateUtil.formatYMDHMD(v);
103+
}
104+
return v;
102105
}
103106
},
104107
{title: 'cron表达式', dataIndex: 'cronExpression', sortable: false, width: 100},
@@ -117,7 +120,7 @@
117120
},
118121
{
119122
title: '内容', dataIndex: 'msg', sortable: false, width: 200
120-
},
123+
}
121124
];
122125
123126
var store = new Data.Store({

lts-core/src/main/java/com/lts/job/core/remoting/HeartBeatMonitor.java

Lines changed: 77 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,17 @@ public class HeartBeatMonitor {
3939

4040
private RemotingClientDelegate remotingClient;
4141
private Application application;
42-
private HeartBeat heartBeat;
4342
private EventSubscriber eventSubscriber;
4443

4544
public HeartBeatMonitor(RemotingClientDelegate remotingClient, Application application) {
4645
this.remotingClient = remotingClient;
4746
this.application = application;
48-
this.heartBeat = new HeartBeat();
4947
this.eventSubscriber = new EventSubscriber("PING_" + application.getConfig().getIdentity(),
5048
new Observer() {
5149
@Override
5250
public void onObserved(EventInfo eventInfo) {
5351
startFastPing();
52+
stopPing();
5453
}
5554
});
5655
}
@@ -72,8 +71,17 @@ private void startPing() {
7271
if (pingStart.compareAndSet(false, true)) {
7372
// 用来监听 JobTracker不可用的消息,然后马上启动 快速检查定时器
7473
application.getEventCenter().subscribe(eventSubscriber, EcTopic.NO_JOB_TRACKER_AVAILABLE);
75-
pingScheduledFuture = PING_EXECUTOR_SERVICE.scheduleWithFixedDelay(
76-
heartBeat, 30, 30, TimeUnit.SECONDS); // 30s 一次心跳
74+
if (pingScheduledFuture == null) {
75+
pingScheduledFuture = PING_EXECUTOR_SERVICE.scheduleWithFixedDelay(
76+
new Runnable() {
77+
@Override
78+
public void run() {
79+
if (pingStart.get()) {
80+
ping();
81+
}
82+
}
83+
}, 30, 30, TimeUnit.SECONDS); // 30s 一次心跳
84+
}
7785
LOGGER.info("Start heart beat monitor success!");
7886
}
7987
} catch (Throwable t) {
@@ -84,8 +92,8 @@ private void startPing() {
8492
private void stopPing() {
8593
try {
8694
if (pingStart.compareAndSet(true, false)) {
87-
pingScheduledFuture.cancel(true);
88-
PING_EXECUTOR_SERVICE.shutdown();
95+
// pingScheduledFuture.cancel(true);
96+
// PING_EXECUTOR_SERVICE.shutdown();
8997
application.getEventCenter().unSubscribe(EcTopic.NO_JOB_TRACKER_AVAILABLE, eventSubscriber);
9098
LOGGER.info("Stop heart beat monitor success!");
9199
}
@@ -98,8 +106,17 @@ private void startFastPing() {
98106
if (fastPingStart.compareAndSet(false, true)) {
99107
try {
100108
// 2s 一次进行检查
101-
fastPingScheduledFuture = FAST_PING_EXECUTOR.scheduleWithFixedDelay(
102-
heartBeat, 1, 2, TimeUnit.MILLISECONDS);
109+
if (fastPingScheduledFuture == null) {
110+
fastPingScheduledFuture = FAST_PING_EXECUTOR.scheduleWithFixedDelay(
111+
new Runnable() {
112+
@Override
113+
public void run() {
114+
if (fastPingStart.get()) {
115+
ping();
116+
}
117+
}
118+
}, 1, 2, TimeUnit.MILLISECONDS);
119+
}
103120
LOGGER.info("Start fast ping runner success!");
104121
} catch (Throwable t) {
105122
LOGGER.error("Start fast ping runner failed!", t);
@@ -110,84 +127,80 @@ private void startFastPing() {
110127
private void stopFastPing() {
111128
try {
112129
if (fastPingStart.compareAndSet(true, false)) {
113-
fastPingScheduledFuture.cancel(true);
114-
FAST_PING_EXECUTOR.shutdown();
130+
// fastPingScheduledFuture.cancel(true);
131+
// FAST_PING_EXECUTOR.shutdown();
115132
LOGGER.info("Stop fast ping runner success!");
116133
}
117134
} catch (Throwable t) {
118135
LOGGER.error("Stop fast ping runner failed!", t);
119136
}
120137
}
121138

122-
private class HeartBeat implements Runnable {
139+
private AtomicBoolean running = new AtomicBoolean(false);
123140

124-
private AtomicBoolean running = new AtomicBoolean(false);
125-
126-
@Override
127-
public void run() {
128-
try {
129-
if (running.compareAndSet(false, true)) {
130-
// to ensure only one thread go there
131-
try {
132-
check();
133-
} finally {
134-
running.compareAndSet(true, false);
135-
}
141+
private void ping() {
142+
try {
143+
if (running.compareAndSet(false, true)) {
144+
// to ensure only one thread go there
145+
try {
146+
check();
147+
} finally {
148+
running.compareAndSet(true, false);
136149
}
137-
} catch (Throwable t) {
138-
LOGGER.error(t.getMessage(), t);
139150
}
151+
} catch (Throwable t) {
152+
LOGGER.error(t.getMessage(), t);
140153
}
154+
}
141155

142-
private void check() {
143-
List<Node> jobTrackers = application.getSubscribedNodeManager().getNodeList(NodeType.JOB_TRACKER);
144-
if (CollectionUtils.isEmpty(jobTrackers)) {
145-
return;
146-
}
147-
for (Node jobTracker : jobTrackers) {
148-
// 每个JobTracker 都要发送心跳
149-
if (beat(remotingClient, jobTracker.getAddress())) {
150-
remotingClient.addJobTracker(jobTracker);
151-
if (!remotingClient.isServerEnable()) {
152-
remotingClient.setServerEnable(true);
153-
application.getEventCenter().publishAsync(new EventInfo(EcTopic.JOB_TRACKER_AVAILABLE));
154-
} else {
155-
remotingClient.setServerEnable(true);
156-
}
157-
stopFastPing();
158-
startPing();
156+
private void check() {
157+
List<Node> jobTrackers = application.getSubscribedNodeManager().getNodeList(NodeType.JOB_TRACKER);
158+
if (CollectionUtils.isEmpty(jobTrackers)) {
159+
return;
160+
}
161+
for (Node jobTracker : jobTrackers) {
162+
// 每个JobTracker 都要发送心跳
163+
if (beat(remotingClient, jobTracker.getAddress())) {
164+
remotingClient.addJobTracker(jobTracker);
165+
if (!remotingClient.isServerEnable()) {
166+
remotingClient.setServerEnable(true);
167+
application.getEventCenter().publishAsync(new EventInfo(EcTopic.JOB_TRACKER_AVAILABLE));
159168
} else {
160-
remotingClient.removeJobTracker(jobTracker);
169+
remotingClient.setServerEnable(true);
161170
}
171+
stopFastPing();
172+
startPing();
173+
} else {
174+
remotingClient.removeJobTracker(jobTracker);
162175
}
163176
}
177+
}
164178

165-
/**
166-
* 发送心跳
167-
*
168-
* @param remotingClient
169-
* @param addr
170-
*/
171-
private boolean beat(RemotingClientDelegate remotingClient, String addr) {
179+
/**
180+
* 发送心跳
181+
*
182+
* @param remotingClient
183+
* @param addr
184+
*/
185+
private boolean beat(RemotingClientDelegate remotingClient, String addr) {
172186

173-
HeartBeatRequest commandBody = application.getCommandBodyWrapper().wrapper(new HeartBeatRequest());
187+
HeartBeatRequest commandBody = application.getCommandBodyWrapper().wrapper(new HeartBeatRequest());
174188

175-
RemotingCommand request = RemotingCommand.createRequestCommand(
176-
JobProtos.RequestCode.HEART_BEAT.code(), commandBody);
177-
try {
178-
RemotingCommand response = remotingClient.getNettyClient().invokeSync(addr, request, 5000);
179-
if (response != null && JobProtos.ResponseCode.HEART_BEAT_SUCCESS ==
180-
JobProtos.ResponseCode.valueOf(response.getCode())) {
181-
if (LOGGER.isDebugEnabled()) {
182-
LOGGER.debug("heart beat success! ");
183-
}
184-
return true;
189+
RemotingCommand request = RemotingCommand.createRequestCommand(
190+
JobProtos.RequestCode.HEART_BEAT.code(), commandBody);
191+
try {
192+
RemotingCommand response = remotingClient.getNettyClient().invokeSync(addr, request, 5000);
193+
if (response != null && JobProtos.ResponseCode.HEART_BEAT_SUCCESS ==
194+
JobProtos.ResponseCode.valueOf(response.getCode())) {
195+
if (LOGGER.isDebugEnabled()) {
196+
LOGGER.debug("heart beat success! ");
185197
}
186-
} catch (Exception e) {
187-
LOGGER.error(e.getMessage(), e);
198+
return true;
188199
}
189-
return false;
200+
} catch (Exception e) {
201+
LOGGER.error(e.getMessage(), e);
190202
}
203+
return false;
191204
}
192205

193206
}

lts-example/src/main/java/com/lts/job/example/api/JobClientTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,13 @@ public static void testProtector() {
6262
// jobClient.addConfig("job.fail.store", "leveldb"); // 默认
6363
// jobClient.addConfig("job.fail.store", "berkeleydb");
6464
// jobClient.addConfig("job.fail.store", "rocksdb");
65-
jobClient.addConfig("job.submit.concurrency.size", "3");
65+
jobClient.addConfig("job.submit.concurrency.size", "20");
6666
jobClient.start();
6767

68+
fastSubmit(jobClient);
69+
}
70+
71+
private static void fastSubmit(final JobClient jobClient) {
6872
final AtomicLong num = new AtomicLong();
6973
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
7074

@@ -81,7 +85,7 @@ public void run() {
8185
try {
8286
Response response = jobClient.submitJob(job);
8387
System.out.print(" " + num.incrementAndGet());
84-
if(num.incrementAndGet() % 50 == 0){
88+
if (num.incrementAndGet() % 50 == 0) {
8589
System.out.println("");
8690
}
8791
// System.out.println(JSONObject.toJSONString(response));
@@ -97,7 +101,6 @@ public void run() {
97101
}
98102
}).start();
99103
}
100-
101104
}
102105

103106
}

lts-example/src/main/java/com/lts/job/example/api/JobTrackerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ public class JobTrackerTest {
1212
public static void main(String[] args) {
1313

1414
// 1. 使用mongo做任务队列
15-
// testMongoQueue();
15+
testMongoQueue();
1616
// 2. 使用mysql做任务队列
17-
testMysqlQueue();
17+
// testMysqlQueue();
1818
}
1919

2020
/**

lts-example/src/main/java/com/lts/job/example/support/TestJobRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public void run(Job job) throws Throwable {
2525
bizLogger.info("测试,业务日志啊啊啊啊啊");
2626

2727
try {
28-
Thread.sleep(5*1000L);
28+
System.out.println("我要睡个1s");
29+
Thread.sleep(1000L);
2930
} catch (InterruptedException e) {
3031
e.printStackTrace();
3132
}

0 commit comments

Comments
 (0)