Skip to content

Commit 2822f90

Browse files
committed
Merge pull request #2 from qq254963746/develop
Develop
2 parents 7d2b3d0 + 8ff7a64 commit 2822f90

File tree

95 files changed

+1020
-892
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+1020
-892
lines changed

README.md

Lines changed: 28 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
33

44
## 框架概况:
55
LTS是一个轻量级分布式任务调度框架,参考hadoop的部分思想。有三种角色, JobClient, JobTracker, TaskTracker。各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载量, 并且框架具有很好的容错能力。
6-
采用Zookeeper暴露节点信息,master选举。Mongo存储任务队列和任务执行日志, netty做底层通信。
6+
采用多种注册中心(Zookeeper,redis等)进行节点信息暴露,master选举。Mongo存储任务队列和任务执行日志, netty做底层通信。
77
* JobClient : 主要负责提交任务, 和 接收任务执行反馈结果。
88
* JobTracker : 负责接收并分配任务,任务调度。
99
* TaskTracker: 负责执行任务,执行完反馈给JobTracker。
1010

11-
框架支持实时任务,也支持定时任务,同时也支持CronExpression, 有问题,请联系QQ254963746, 或加入QQ群:109500214 一起探讨
11+
框架支持实时任务,也支持定时任务,同时也支持CronExpression, 有问题,请加QQ群:109500214 一起完善,探讨
1212

1313
##架构图
1414
![Aaron Swartz](https://raw.githubusercontent.com/qq254963746/light-task-schedule/master/data/%E6%9E%B6%E6%9E%84%E5%9B%BE.png)
1515
##节点组:
1616
* 1. 一个节点组等同于一个集群,同一个节点组中的各个节点是对等的,外界无论连接节点组中的任务一个节点都是可以的。
17-
* 2. 每个节点组中都有一个master节点,采用zookeeper进行master选举(master宕机,会自动选举出新的master节点),框架会提供接口API来监听master节点的变化,用户可以自己使用master节点做自己想做的事情。
17+
* 2. 每个节点组中都有一个master节点(master宕机,会自动选举出新的master节点),框架会提供接口API来监听master节点的变化,用户可以自己使用master节点做自己想做的事情。
1818
* 3. JobClient和TaskTracker都可以存在多个节点组。譬如 JobClient 可以存在多个节点组。 譬如:JobClient 节点组为 ‘lts_WEB’ 中的一个节点提交提交一个 只有节点组为’lts_TRADE’的 TaskTracker 才能执行的任务。
1919
* 4. (每个集群中)JobTacker只有一个节点组。
2020
* 5. 多个JobClient节点组和多个TaskTracker节点组再加上一个JobTacker节点组, 组成一个大的集群。
@@ -38,69 +38,42 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
3838

3939
* 伸缩性:
4040
* 因为各个节点都是无状态的,可以动态增加机器部署实例, 节点关注者会自动发现。
41+
* 扩展性:
42+
* 采用和dubbo一样的SPI扩展方式,可以实现任务队列扩展,日志记录器扩展等
4143

4244
## 开发计划:
4345
* WEB后台管理
4446
* 框架优化
4547

4648
## 调用示例
47-
* 安装 zookeeper 和 mongo , 执行 data/mongo 目录下的 mongo.md 中的语句
49+
* 安装 zookeeper(或redis) 和 mongo (后提供其他任务队列实现方式)
4850

4951
运行 job-example模块中的例子(包含API启动例子和Spring例子)
5052
分别执行 JobTrackerTest TaskTrackerTest JobClientTest
5153

52-
这里给出的是java API(设置配置)方式启动, 也可以使用spring启动默认不启用spring,需引入job-ext-spring包
54+
这里给出的是java API(设置配置)方式启动, (spring启动和面添加)
5355

5456
## JobTracker 端
5557
```java
5658
final JobTracker jobTracker = new JobTracker();
5759
// 节点信息配置
5860
jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
59-
// jobTracker.setListenPort(35001); // 默认 35001
60-
// jobTracker.setClusterName("lts");
61-
61+
// jobTracker.setRegistryAddress("redis://127.0.0.1:6379");
62+
jobTracker.setListenPort(35002); // 默认 35001
63+
jobTracker.setClusterName("test_cluster");
64+
jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
65+
// 设置业务日志记录
66+
// jobTracker.addConfig("job.logger", "mongo");
67+
// 任务队列用mongo
68+
jobTracker.addConfig("job.queue", "mongo");
6269
// mongo 配置
63-
Config config = new Config();
64-
config.setAddresses(new String[]{"127.0.0.1:27017"});
65-
config.setUsername("lts");
66-
config.setPassword("lts");
67-
config.setDbName("job");
68-
jobTracker.setStoreConfig(config);
69-
70+
jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); // 多个地址用逗号分割
71+
jobTracker.addConfig("mongo.database", "job");
72+
jobTracker.setOldDataHandler(new OldDataDeletePolicy());
73+
// 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient
74+
jobTracker.addConfig("zk.client", "zkclient");
7075
// 启动节点
7176
jobTracker.start();
72-
73-
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
74-
@Override
75-
public void run() {
76-
jobTracker.stop();
77-
}
78-
}));
79-
80-
```
81-
或者Spring配置
82-
```xml
83-
<bean id="mongoConfig" class="com.lts.job.store.Config">
84-
<property name="addresses">
85-
<array>
86-
<value>127.0.0.1:27017</value>
87-
</array>
88-
</property>
89-
<property name="username" value="lts"/>
90-
<property name="password" value="lts"/>
91-
<property name="dbName" value="job"/>
92-
</bean>
93-
<bean id="jobTracker" class="com.lts.job.spring.JobTrackerFactoryBean" init-method="start">
94-
<!--<property name="clusterName" value="lts"/>--> <!-- 集群名称 -->
95-
<!--<property name="listenPort" value="35001"/>--> <!-- 默认 35001 -->
96-
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
97-
<property name="storeConfig" ref="mongoConfig"/>
98-
<property name="masterChangeListeners">
99-
<array>
100-
<bean class="com.lts.job.example.support.MasterChangeListenerImpl"/>
101-
</array>
102-
</property>
103-
</bean>
10477
```
10578

10679
## TaskTracker端
@@ -109,83 +82,38 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
10982
taskTracker.setJobRunnerClass(TestJobRunner.class);
11083
// jobClient.setClusterName("lts");
11184
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
85+
// taskTracker.setRegistryAddress("redis://127.0.0.1:6379");
11286
taskTracker.setNodeGroup("test_trade_TaskTracker");
87+
taskTracker.setClusterName("test_cluster");
11388
taskTracker.setWorkThreads(20);
11489
taskTracker.start();
115-
11690
// 任务执行类
11791
public class TestJobRunner implements JobRunner {
118-
11992
@Override
12093
public void run(Job job) throws Throwable {
121-
12294
System.out.println("我要执行"+ job);
12395
System.out.println(job.getParam("shopId"));
124-
12596
try {
12697
Thread.sleep(5*1000L);
12798
} catch (InterruptedException e) {
12899
e.printStackTrace();
129100
}
130-
131101
}
132102
}
133103
```
134-
或者Spring方式配置
135-
```xml
136-
<bean id="taskTracker" class="com.lts.job.spring.TaskTrackerFactoryBean" init-method="start">
137-
<!--<property name="clusterName" value="lts"/>-->
138-
<property name="nodeGroup" value="test_trade_TaskTracker"/><!-- 所属节点组名称 -->
139-
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
140-
<property name="jobRunnerClass" value="com.lts.job.example.support.TestJobRunner"/> <!-- 任务执行类 -->
141-
<property name="workThreads" value="1"/> <!-- 工作线程个数 -->
142-
<property name="masterChangeListeners"> <!-- 所属节点组中master节点变化监听器,可以不用配置 -->
143-
<array>
144-
<bean class="com.lts.job.example.support.MasterChangeListenerImpl"/>
145-
</array>
146-
</property>
147-
</bean>
148-
```
149104

150105
## JobClient端
151106
```java
152107
JobClient jobClient = new RetryJobClient();
153-
// JobClient jobClient = new JobClient();
154-
jobClient.setNodeGroup("test_JobClient");
155-
// jobClient.setClusterName("lts");
108+
// final JobClient jobClient = new JobClient();
109+
jobClient.setNodeGroup("test_jobClient");
110+
jobClient.setClusterName("test_cluster");
156111
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
157-
jobClient.start();
158-
159-
// 提交任务
160-
Job job = new Job();
161-
job.setParam("shopId", "11111");
162-
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
163-
// job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式
164-
// job.setTriggerTime(new Date().getTime()); // 支持指定时间执行
165-
Response response = jobClient.submitJob(job);
166-
```
167-
或者spring方式启动
168-
```xml
169-
<bean id="jobClient" class="com.lts.job.spring.JobClientFactoryBean" init-method="start">
170-
<property name="clientType" value="retry"/> <!-- 取值: 为空(默认normal), normal, retry -->
171-
<!--<property name="clusterName" value="lts"/>--> <!-- 默认 defaultCluster -->
172-
<property name="nodeGroup" value="test_JobClient"/> <!-- 节点组名称 -->
173-
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
174-
<property name="jobFinishedHandler">
175-
<bean class="com.lts.job.example.support.JobFinishedHandlerImpl"/> <!-- 任务完成处理器 -->
176-
</property>
177-
<property name="masterChangeListeners"><!-- 所属节点组中master节点变化监听器 -->
178-
<array>
179-
<bean class="com.lts.job.example.support.MasterChangeListenerImpl"/>
180-
</array>
181-
</property>
182-
</bean>
183-
```
184-
```java
185-
// 从Spring容器中取得JobClient Bean
186-
JobClient jobClient = (JobClient) applicationContext.getBean("jobClient");
112+
// jobClient.setRegistryAddress("redis://127.0.0.1:6379");
113+
jobClient.setJobInfoSavePath("xx");
187114
// 提交任务
188115
Job job = new Job();
116+
job.setTaskId("3213213123");
189117
job.setParam("shopId", "11111");
190118
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
191119
// job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式

data/mongo/mongo.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ db.auth("lts", "lts");
88
db.JobPo.ensureIndex({"jobId":1},{unique:true});
99
db.JobPo.ensureIndex({"taskTrackerNodeGroup":1, "taskId":1},{unique:true});
1010
db.JobPo.ensureIndex({"taskTrackerIdentity":1});
11-
db.JobPo.ensureIndex({"priority":1, "triggerTime":1});
11+
db.JobPo.ensureIndex({"triggerTime":1, "priority":1, "gmtCreate": 1});
1212
db.JobPo.ensureIndex({"isRunning":1});
1313
db.JobPo.ensureIndex({"taskTrackerNodeGroup":1, "isRunning":1, "triggerTime":1});
1414

job-admin/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
44
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
55
<parent>
6-
<artifactId>light-task-schedule</artifactId>
6+
<artifactId>job-parent</artifactId>
77
<groupId>com.lts</groupId>
8-
<version>1.3.3-SNAPSHOT</version>
8+
<version>1.4.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>war</packaging>

job-admin/src/main/java/com/lts/job/web/support/node/ZkNodeManager.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package com.lts.job.web.support.node;
22

3+
import com.lts.job.core.cluster.Config;
34
import com.lts.job.core.cluster.Node;
45
import com.lts.job.core.cluster.NodeType;
6+
import com.lts.job.core.constant.Constants;
57
import com.lts.job.core.registry.NodeRegistryUtils;
8+
import com.lts.job.core.registry.Registry;
9+
import com.lts.job.core.registry.RegistryFactory;
610
import com.lts.job.core.util.CollectionUtils;
711
import com.lts.job.zookeeper.ZookeeperClient;
812
import com.lts.job.zookeeper.zkclient.ZkClientZookeeperClient;
@@ -21,7 +25,11 @@ public class ZkNodeManager implements NodeManager {
2125
private ZookeeperClient zkClient;
2226

2327
public ZkNodeManager() {
24-
this.zkClient = new ZkClientZookeeperClient(AppConfigurer.getProperties("zookeeper.address"));
28+
Config config = new Config();
29+
config.setParameter(Constants.ZK_CLIENT_KEY, "");
30+
config.setRegistryAddress(AppConfigurer.getProperties("registry.address"));
31+
// Registry registry = RegistryFactory.getRegistry(config);
32+
// this.zkClient = new ZkClientZookeeperClient();
2533
}
2634

2735
@Override
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11

22
#zookeeper地址
3-
zookeeper.address=127.0.0.1:2181
3+
registry.address=zookeeper://127.0.0.1:2181

job-client/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
xmlns="http://maven.apache.org/POM/4.0.0"
44
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
55
<parent>
6-
<artifactId>light-task-schedule</artifactId>
6+
<artifactId>job-parent</artifactId>
77
<groupId>com.lts</groupId>
8-
<version>1.3.3-SNAPSHOT</version>
8+
<version>1.4.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

job-core/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
xmlns="http://maven.apache.org/POM/4.0.0"
44
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
55
<parent>
6-
<artifactId>light-task-schedule</artifactId>
6+
<artifactId>job-parent</artifactId>
77
<groupId>com.lts</groupId>
8-
<version>1.3.3-SNAPSHOT</version>
8+
<version>1.4.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
11-
11+
<packaging>jar</packaging>
1212
<artifactId>job-core</artifactId>
1313
</project>

job-core/src/main/java/com/lts/job/core/cluster/AbstractJobNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void destroy() {
106106
}
107107

108108
private void initRegistry() {
109-
registry = RegistryFactory.getRegistry(application);
109+
registry = RegistryFactory.getRegistry(config);
110110
if (registry instanceof AbstractRegistry) {
111111
((AbstractRegistry) registry).setNode(node);
112112
}

job-core/src/main/java/com/lts/job/core/constant/Constants.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,13 @@ public interface Constants {
5555
/**
5656
* 注册中心自动重连时间
5757
*/
58-
public static final String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period";
58+
public static final String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period";
5959

60-
public static final int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000;
60+
public static final int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000;
6161

62+
public static final String ZK_CLIENT_KEY = "zk.client";
63+
64+
public static final String JOB_LOGGER_KEY = "job.logger";
65+
66+
public static final String JOB_QUEUE_KEY = "job.queue";
6267
}

job-core/src/main/java/com/lts/job/core/extension/ExtensionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package com.lts.job.core.extension;
1717

1818
/**
19-
* Created by hugui on 5/18/15.
19+
* @author Robert HG (254963746@qq.com) on 5/18/15.
2020
*/
2121
@SPI
2222
public interface ExtensionFactory {

0 commit comments

Comments
 (0)