Skip to content

Commit cf1cf22

Browse files
committed
Merge pull request #36 from qq254963746/develop
Develop
2 parents a084f54 + 0d1bb5e commit cf1cf22

File tree

5 files changed

+64
-84
lines changed

5 files changed

+64
-84
lines changed

README.md

Lines changed: 33 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,106 +1,83 @@
11
LTS 轻量级分布式任务调度框架(Light Task Schedule)
22
-----------------
33

4-
## 框架概况:
4+
###框架概况:
55
LTS是一个轻量级分布式任务调度框架,参考hadoop的部分思想。有三种角色, JobClient, JobTracker, TaskTracker。各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载量, 并且框架具有很好的容错能力。
66
采用多种注册中心(Zookeeper,redis等)进行节点信息暴露,master选举。(Mongo or Mysql)存储任务队列和任务执行日志, netty做底层通信。
77
* JobClient : 主要负责提交任务, 和 接收任务执行反馈结果。
88
* JobTracker : 负责接收并分配任务,任务调度。
99
* TaskTracker: 负责执行任务,执行完反馈给JobTracker。
1010

11-
框架支持实时任务,也支持定时任务,同时也支持CronExpression, 有问题,请加QQ群:109500214 一起完善,探讨
11+
支持任务类型:
12+
* 实时任务
13+
* 也支持定时任务 (如:3天之后执行)
14+
* CronExpression (如:0 0/1 * * * ?)
1215

13-
##架构图
16+
感兴趣,请加QQ群:109500214 一起探讨、完善。并且记得star一下哈,3Q
17+
18+
###架构图
1419
![Aaron Swartz](https://raw.githubusercontent.com/qq254963746/light-task-schedule/master/doc/LTS_architecture.png)
15-
##节点组:
20+
####节点组:
1621
* 1. 一个节点组等同于一个集群,同一个节点组中的各个节点是对等的,外界无论连接节点组中的任务一个节点都是可以的。
1722
* 2. 每个节点组中都有一个master节点(master宕机,会自动选举出新的master节点),框架会提供接口API来监听master节点的变化,用户可以自己使用master节点做自己想做的事情。
1823
* 3. JobClient和TaskTracker都可以存在多个节点组。譬如 JobClient 可以存在多个节点组。 譬如:JobClient 节点组为 ‘lts_WEB’ 中的一个节点提交提交一个 只有节点组为’lts_TRADE’的 TaskTracker 才能执行的任务。
1924
* 4. (每个集群中)JobTacker只有一个节点组。
2025
* 5. 多个JobClient节点组和多个TaskTracker节点组再加上一个JobTacker节点组, 组成一个大的集群。
2126

22-
## 工作流程:
23-
* 1. JobClient 提交一个 任务 给 JobTracker, 这里我提供了两种客户端API, 一种是如果JobTracker 不存在或者提交失败,直接返回提交失败。另一种客户端是重试客户端, 如果提交失败,先存储到本地leveldb(可以使用NFS来达到同个节点组共享leveldb文件的目的,多线程访问,做了文件锁处理),返回给客户端提交成功的信息,待JobTracker可用的时候,再将任务提交。
24-
* 2. JobTracker 收到JobClient提交来的任务,先生成一个唯一的JobID。然后将任务储存在Mongo集群中。JobTracker 发现有(任务执行的)可用的TaskTracker节点(组) 之后,将优先级最大,最先提交的任务分发给TaskTracker。这里JobTracker会优先分配给比较空闲的TaskTracker节点,达到负载均衡。
25-
* 3. TaskTracker 收到JobTracker分发来的任务之后,执行。执行完毕之后,再反馈任务执行结果给JobTracker(成功or 失败[失败有失败错误信息]),如果发现JobTacker不可用,那么存储本地leveldb,等待TaskTracker可用的时候再反馈。反馈结果的同时,询问JobTacker有没有新的任务要执行。
26-
* 4. JobTacker收到TaskTracker节点的任务结果信息,生成并插入(mongo or mysql)任务执行日志。根据任务信息决定要不要反馈给客户端。不需要反馈的直接删除, 需要反馈的(同样JobClient不可用存储文件,等待可用重发)。
27-
* 5. JobClient 收到任务执行结果,进行自己想要的逻辑处理。
28-
29-
## 特性
27+
###工作流程:
28+
* 1. JobClient 提交一个 任务 给 JobTracker, 这里我提供了两种客户端API, 一种是如果JobTracker 不存在或者提交失败,直接返回提交失败。另一种客户端是重试客户端, 如果提交失败,先存储到本地FailStore(可以使用NFS来达到同个节点组共享leveldb文件的目的,多线程访问,已经做了文件锁处理),返回给客户端提交成功的信息,待JobTracker可用的时候,再将任务提交。
29+
* 2. JobTracker收到JobClient提交来的任务,将任务存入任务队列。JobTracker等待TaskTracker的Pull请求,然后将任务Push给TaskTracker去执行。
30+
* 3. TaskTracker收到JobTracker分发来的任务之后,然后从线程池中拿到一个线程去执行。执行完毕之后,再反馈任务执行结果给JobTracker(成功or 失败[失败有失败错误信息]),如果发现JobTacker不可用,那么存储本地FailStore,等待TaskTracker可用的时候再反馈。反馈结果的同时,询问JobTacker有没有新的任务要执行。
31+
* 4. JobTacker收到TaskTracker节点的任务结果信息。根据任务信息决定要不要反馈给客户端。不需要反馈的直接删除,需要反馈的,直接反馈,反馈失败进入FeedbackQueue, 等待重新反馈。
32+
* 5. JobClient收到任务执行结果,进行自己想要的逻辑处理。
3033

34+
###特性
3135
* 负载均衡:
32-
* JobClient 和 TaskTracker会随机连接JobTracker节点组中的一个节点,实现JobTracker负载均衡。当连接上后,将一直保持连接这个节点,保持连接通道,直到这个节点不可用,减少每次都重新连接一个节点带来的性能开销。
33-
* JobTracker 分发任务时,是优先分配给最空闲的一个TaskTracker节点,实现TaskTracker节点的负载均衡。
36+
* JobClient和TaskTracker可是根据自己设置的负载均衡策略来请求JobTracker节点组中的一个节点。当连接上后将一直保持连接这个节点,保持连接通道,直到这个节点不可用,减少每次都重新连接一个节点带来的性能开销。
3437

3538
* 健壮性:
3639
* 当节点组中的一个节点当机之后,自动转到其他节点工作。当整个节点组当机之后,将会采用存储文件的方式,待节点组可用的时候进行重发。
37-
* 当执行任务的TaskTracker节点当机之后,JobTracker 会将这个TaskTracker上的未完成的任务(死任务),重新分配给节点组中其他节点执行。
40+
* 当执行任务的TaskTracker节点当机之后,JobTracker会将这个TaskTracker上的未完成的任务(死任务),重新分配给节点组中其他节点执行。
3841

3942
* 伸缩性:
4043
* 因为各个节点都是无状态的,可以动态增加机器部署实例, 节点关注者会自动发现。
4144
* 扩展性:
4245
* 采用和dubbo一样的SPI扩展方式,可以实现任务队列扩展,日志记录器扩展等
4346

44-
## 开发计划:
45-
* WEB后台管理
46-
* 框架优化
47-
48-
## 调用示例
49-
* 安装 zookeeper(或redis) 和 mongo(或mysql) (后提供其他任务队列实现方式)
47+
###日志记录
48+
对于任务的分发,执行,还有用户通过 (BizLogger) 【LtsLoggerFactory.getBizLogger()】 输入的业务日志,LTS都有记录,用户可以在LTS Admin 后台界面查看某个任务的所有日志,可以实时查看这个任务的执行情况。
5049

51-
运行 job-example模块中的例子(包含API启动例子和Spring例子)
52-
分别执行 JobTrackerTest TaskTrackerTest JobClientTest
50+
###开发计划:
51+
* WEB后台管理:性能统计分析,预警等
52+
* 实现LTS的分布式队列存储
5353

54-
这里给出的是java API(设置配置)方式启动, (spring启动和面添加)
54+
###LTS Admin
55+
![Aaron Swartz](https://raw.githubusercontent.com/qq254963746/light-task-schedule/master/doc/LTS_Admin.png)
56+
###调用示例
57+
下面提供的是最简单的配置方式。更多配置请查看 lts-example 模块下的 API 调用方式例子.
5558

56-
## JobTracker 端
59+
####JobTracker 端
5760
```java
5861
final JobTracker jobTracker = new JobTracker();
5962
// 节点信息配置
6063
jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
61-
// jobTracker.setRegistryAddress("redis://127.0.0.1:6379");
62-
jobTracker.setListenPort(35002); // 默认 35001
63-
jobTracker.setClusterName("test_cluster");
64-
jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
65-
// 设置业务日志记录
66-
// jobTracker.addConfig("job.logger", "console"); // 默认
67-
// jobTracker.addConfig("job.logger", "mysql");
68-
// jobTracker.addConfig("job.logger", "mongo");
69-
7064
// 1. 任务队列用mongo
7165
jobTracker.addConfig("job.queue", "mongo");
7266
// mongo 配置
73-
jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); // 多个地址用逗号分割
74-
jobTracker.addConfig("mongo.database", "job");
75-
76-
// 2. 任务队里用mysql
77-
// jobTracker.addConfig("job.queue", "mysql");
78-
// mysql 配置
79-
// jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts");
80-
// jobTracker.addConfig("jdbc.username", "root");
81-
// jobTracker.addConfig("jdbc.password", "root");
82-
67+
jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017");
68+
jobTracker.addConfig("mongo.database", "lts");
8369
jobTracker.setOldDataHandler(new OldDataDeletePolicy());
84-
// 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient
85-
jobTracker.addConfig("zk.client", "zkclient");
8670
// 启动节点
8771
jobTracker.start();
8872
```
8973

90-
## TaskTracker端
74+
#### TaskTracker端
9175
```java
9276
TaskTracker taskTracker = new TaskTracker();
9377
taskTracker.setJobRunnerClass(TestJobRunner.class);
94-
// jobClient.setClusterName("lts");
9578
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
96-
// taskTracker.setRegistryAddress("redis://127.0.0.1:6379");
9779
taskTracker.setNodeGroup("test_trade_TaskTracker");
98-
taskTracker.setClusterName("test_cluster");
9980
taskTracker.setWorkThreads(20);
100-
// taskTracker.addConfig("job.fail.store", "leveldb"); // 默认
101-
// taskTracker.addConfig("job.fail.store", "berkeleydb");
102-
// taskTracker.addConfig("job.fail.store", "rocksdb");
103-
// taskTracker.setLoadBalance("consistenthash");
10481
taskTracker.start();
10582
// 任务执行类
10683
public class TestJobRunner implements JobRunner {
@@ -117,18 +94,12 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
11794
}
11895
```
11996

120-
## JobClient端
97+
#### JobClient端
12198
```java
12299
JobClient jobClient = new RetryJobClient();
123100
// final JobClient jobClient = new JobClient();
124101
jobClient.setNodeGroup("test_jobClient");
125-
jobClient.setClusterName("test_cluster");
126102
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
127-
// jobClient.setRegistryAddress("redis://127.0.0.1:6379");
128-
// jobClient.addConfig("job.fail.store", "leveldb"); // 默认
129-
// jobClient.addConfig("job.fail.store", "berkeleydb");
130-
// jobClient.addConfig("job.fail.store", "rocksdb");
131-
// jobClient.setLoadBalance("consistenthash");
132103
jobClient.start();
133104

134105
// 提交任务
@@ -137,7 +108,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
137108
job.setParam("shopId", "11111");
138109
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
139110
// job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式
140-
// job.setTriggerTime(new Date().getTime()); // 支持指定时间执行
111+
// job.setTriggerTime(new Date()); // 支持指定时间执行
141112
Response response = jobClient.submitJob(job);
142113
```
143114

doc/LTS_Admin.png

115 KB
Loading

lts-core/src/main/java/com/lts/job/core/domain/Job.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.lts.job.core.commons.utils.StringUtils;
88
import com.lts.job.remoting.annotation.NotNull;
99

10+
import java.util.Date;
1011
import java.util.HashMap;
1112
import java.util.Map;
1213

@@ -126,6 +127,12 @@ public boolean isSchedule() {
126127
return this.cronExpression != null && !"".equals(this.cronExpression.trim());
127128
}
128129

130+
public void setTriggerTime(Date date){
131+
if(date != null){
132+
this.triggerTime = date.getTime();
133+
}
134+
}
135+
129136
public Long getTriggerTime() {
130137
return triggerTime;
131138
}

lts-core/src/main/java/com/lts/job/core/remoting/RemotingClientDelegate.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ public void invokeAsync(RemotingCommand request, InvokeCallback invokeCallback)
121121
remotingClient.invokeAsync(jobTracker.getAddress(), request,
122122
application.getConfig().getInvokeTimeoutMillis(), invokeCallback);
123123
this.serverEnable = true;
124+
// publish msg
125+
EventInfo eventInfo = new EventInfo(EcTopic.NO_JOB_TRACKER_AVAILABLE);
126+
application.getEventCenter().publishAsync(eventInfo);
124127

125128
} catch (Throwable e) {
126129
// 将这个JobTracker移除
@@ -152,6 +155,9 @@ public void invokeOneway(RemotingCommand request)
152155
remotingClient.invokeOneway(jobTracker.getAddress(), request,
153156
application.getConfig().getInvokeTimeoutMillis());
154157
this.serverEnable = true;
158+
// publish msg
159+
EventInfo eventInfo = new EventInfo(EcTopic.NO_JOB_TRACKER_AVAILABLE);
160+
application.getEventCenter().publishAsync(eventInfo);
155161

156162
} catch (Throwable e) {
157163
// 将这个JobTracker移除

lts-job-tracker/src/main/java/com/lts/job/tracker/support/JobPusher.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ public class JobPusher {
4040

4141
public JobPusher(JobTrackerApplication application) {
4242
this.application = application;
43-
executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5
44-
, new NamedThreadFactory(JobPusher.class.getSimpleName()));
43+
executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5, new NamedThreadFactory(JobPusher.class.getSimpleName()));
4544
}
4645

4746
public void push(final RemotingServerDelegate remotingServer, final JobPullRequest request) {
@@ -69,9 +68,10 @@ private void pushJob(RemotingServerDelegate remotingServer, JobPullRequest reque
6968
String nodeGroup = request.getNodeGroup();
7069
String identity = request.getIdentity();
7170
// 更新TaskTracker的可用线程数
72-
application.getTaskTrackerManager().updateTaskTrackerAvailableThreads(nodeGroup, identity, request.getAvailableThreads(), request.getTimestamp());
73-
74-
TaskTrackerNode taskTrackerNode = application.getTaskTrackerManager().getTaskTrackerNode(nodeGroup, identity);
71+
application.getTaskTrackerManager().updateTaskTrackerAvailableThreads(nodeGroup,
72+
identity, request.getAvailableThreads(), request.getTimestamp());
73+
TaskTrackerNode taskTrackerNode = application.getTaskTrackerManager().
74+
getTaskTrackerNode(nodeGroup, identity);
7575

7676
if (taskTrackerNode == null) {
7777
return;
@@ -81,24 +81,20 @@ private void pushJob(RemotingServerDelegate remotingServer, JobPullRequest reque
8181

8282
while (availableThreads > 0) {
8383
// 推送任务
84-
int code = sendJob(remotingServer, taskTrackerNode);
85-
if (code == NO_JOB) {
86-
// 没有可以执行的任务, 直接停止
87-
break;
88-
}
89-
if (code == PUSH_FAILED) {
84+
PushResult result = sendJob(remotingServer, taskTrackerNode);
85+
if (result == PushResult.SUCCESS) {
86+
availableThreads = taskTrackerNode.getAvailableThread().decrementAndGet();
87+
} else {
9088
break;
9189
}
92-
availableThreads = taskTrackerNode.getAvailableThread().get();
9390
}
9491
}
9592

96-
// 没有任务可执行
97-
private final int NO_JOB = 1;
98-
// 推送成功
99-
private final int PUSH_SUCCESS = 2;
100-
// 推送失败
101-
private final int PUSH_FAILED = 3;
93+
private enum PushResult {
94+
NO_JOB, // 没有任务可执行
95+
SUCCESS, //推送成功
96+
FAILED //推送失败
97+
}
10298

10399
/**
104100
* 是否推送成功
@@ -107,7 +103,7 @@ private void pushJob(RemotingServerDelegate remotingServer, JobPullRequest reque
107103
* @param taskTrackerNode
108104
* @return
109105
*/
110-
private int sendJob(RemotingServerDelegate remotingServer, TaskTrackerNode taskTrackerNode) {
106+
private PushResult sendJob(RemotingServerDelegate remotingServer, TaskTrackerNode taskTrackerNode) {
111107

112108
String nodeGroup = taskTrackerNode.getNodeGroup();
113109
String identity = taskTrackerNode.getIdentity();
@@ -116,7 +112,7 @@ private int sendJob(RemotingServerDelegate remotingServer, TaskTrackerNode taskT
116112
JobPo jobPo = application.getExecutableJobQueue().take(nodeGroup, identity);
117113

118114
if (jobPo == null) {
119-
return NO_JOB;
115+
return PushResult.NO_JOB;
120116
}
121117

122118
JobPushRequest body = application.getCommandBodyWrapper().wrapper(new JobPushRequest());
@@ -162,7 +158,7 @@ public void operationComplete(ResponseFuture responseFuture) {
162158
LOGGER.debug("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", job=" + job);
163159
}
164160
application.getExecutableJobQueue().resume(jobPo);
165-
return PUSH_FAILED;
161+
return PushResult.FAILED;
166162
}
167163
try {
168164
application.getExecutingJobQueue().add(jobPo);
@@ -178,6 +174,6 @@ public void operationComplete(ResponseFuture responseFuture) {
178174
jobLogPo.setLevel(Level.INFO);
179175
application.getJobLogger().log(jobLogPo);
180176

181-
return PUSH_SUCCESS;
177+
return PushResult.SUCCESS;
182178
}
183179
}

0 commit comments

Comments
 (0)