Skip to content

Commit 1823ffa

Browse files
committed
Merge pull request #18 from qq254963746/develop
Develop
2 parents 2652bfe + 7b600d9 commit 1823ffa

File tree

112 files changed

+2109
-1090
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+2109
-1090
lines changed

job-client/src/main/java/com/lts/job/client/JobClient.java

Lines changed: 72 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.lts.job.client.support.JobFinishedHandler;
99
import com.lts.job.client.support.JobSubmitExecutor;
1010
import com.lts.job.client.support.JobSubmitProtector;
11+
import com.lts.job.client.support.SubmitCallback;
1112
import com.lts.job.core.Application;
1213
import com.lts.job.core.cluster.AbstractClientNode;
1314
import com.lts.job.core.constant.Constants;
@@ -21,15 +22,16 @@
2122
import com.lts.job.core.protocol.command.JobSubmitResponse;
2223
import com.lts.job.core.util.BatchUtils;
2324
import com.lts.job.core.util.CollectionUtils;
25+
import com.lts.job.core.util.CommonUtils;
2426
import com.lts.job.remoting.InvokeCallback;
25-
import com.lts.job.remoting.exception.RemotingCommandFieldCheckException;
2627
import com.lts.job.remoting.netty.NettyRequestProcessor;
2728
import com.lts.job.remoting.netty.ResponseFuture;
2829
import com.lts.job.remoting.protocol.RemotingCommand;
2930

3031
import java.util.Arrays;
3132
import java.util.List;
3233
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
3335

3436
/**
3537
* @author Robert HG (254963746@qq.com) on 7/25/14.
@@ -76,84 +78,105 @@ private Response protectSubmit(List<Job> jobs) throws JobSubmitException {
7678
return protector.execute(jobs, new JobSubmitExecutor<Response>() {
7779
@Override
7880
public Response execute(List<Job> jobs) throws JobSubmitException {
79-
return _submitJob(jobs);
81+
return submitJob(jobs, SubmitType.ASYNC);
8082
}
8183
});
8284
}
8385

84-
private Response _submitJob(final List<Job> jobs) throws JobSubmitException {
86+
private void checkFields(List<Job> jobs) {
8587
// 参数验证
8688
if (CollectionUtils.isEmpty(jobs)) {
87-
throw new JobSubmitException("提交任务不能为空!");
89+
throw new JobSubmitException("job can not be null!");
8890
}
8991
for (Job job : jobs) {
9092
if (job == null) {
91-
throw new JobSubmitException("提交任务不能为空!");
93+
throw new JobSubmitException("job can not be null!");
9294
} else {
9395
job.checkField();
9496
}
9597
}
98+
}
99+
100+
protected Response submitJob(final List<Job> jobs, SubmitType type) throws JobSubmitException {
101+
// 检查参数
102+
checkFields(jobs);
103+
96104
final Response response = new Response();
97105
try {
98106
JobSubmitRequest jobSubmitRequest = application.getCommandBodyWrapper().wrapper(new JobSubmitRequest());
99107
jobSubmitRequest.setJobs(jobs);
100108

101109
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(JobProtos.RequestCode.SUBMIT_JOB.code(), jobSubmitRequest);
102110

103-
final CountDownLatch latch = new CountDownLatch(1);
104-
remotingClient.invokeAsync(requestCommand, new InvokeCallback() {
111+
SubmitCallback submitCallback = new SubmitCallback() {
105112
@Override
106-
public void operationComplete(ResponseFuture responseFuture) {
107-
try {
108-
RemotingCommand responseCommand = responseFuture.getResponseCommand();
109-
110-
if (responseCommand == null) {
111-
response.setFailedJobs(jobs);
112-
response.setSuccess(false);
113-
LOGGER.warn("提交任务失败: {}, {}",
114-
jobs, "JobTracker中断了");
115-
return;
116-
}
117-
118-
if (JobProtos.ResponseCode.JOB_RECEIVE_SUCCESS.code() == responseCommand.getCode()) {
119-
LOGGER.info("提交任务成功: {}", jobs);
120-
response.setSuccess(true);
121-
return;
122-
}
123-
// 失败的job
124-
JobSubmitResponse jobSubmitResponse = responseCommand.getBody();
125-
response.setFailedJobs(jobSubmitResponse.getFailedJobs());
113+
public void call(RemotingCommand responseCommand) {
114+
if (responseCommand == null) {
115+
response.setFailedJobs(jobs);
126116
response.setSuccess(false);
127-
response.setCode(JobProtos.ResponseCode.valueOf(responseCommand.getCode()).name());
128-
LOGGER.warn("提交任务失败: {}, {}, {}",
129-
jobs,
130-
responseCommand.getRemark(),
131-
jobSubmitResponse.getMsg());
132-
} finally {
133-
latch.countDown();
117+
LOGGER.warn("submit job failed: {}, {}",
118+
jobs, "JobTracker is broken");
119+
return;
134120
}
121+
122+
if (JobProtos.ResponseCode.JOB_RECEIVE_SUCCESS.code() == responseCommand.getCode()) {
123+
LOGGER.info("submit job success: {}", jobs);
124+
response.setSuccess(true);
125+
return;
126+
}
127+
// 失败的job
128+
JobSubmitResponse jobSubmitResponse = responseCommand.getBody();
129+
response.setFailedJobs(jobSubmitResponse.getFailedJobs());
130+
response.setSuccess(false);
131+
response.setCode(JobProtos.ResponseCode.valueOf(responseCommand.getCode()).name());
132+
LOGGER.warn("submit job failed: {}, {}, {}",
133+
jobs,
134+
responseCommand.getRemark(),
135+
jobSubmitResponse.getMsg());
135136
}
136-
});
137+
};
137138

138-
try {
139-
latch.await();
140-
} catch (InterruptedException e) {
141-
throw new RuntimeException("提交任务失败", e);
139+
if (SubmitType.ASYNC.equals(type)) {
140+
asyncSubmit(requestCommand, submitCallback);
141+
} else {
142+
syncSubmit(requestCommand, submitCallback);
142143
}
143-
144-
} catch (RemotingCommandFieldCheckException e) {
145-
response.setSuccess(false);
146-
response.setCode(ResponseCode.REQUEST_FILED_CHECK_ERROR);
147-
response.setMsg("the request body's field check error : " + e.getMessage());
148144
} catch (JobTrackerNotFoundException e) {
149145
response.setSuccess(false);
150146
response.setCode(ResponseCode.JOB_TRACKER_NOT_FOUND);
151147
response.setMsg("can not found JobTracker node!");
148+
} catch (Exception e) {
149+
response.setSuccess(false);
150+
response.setCode(ResponseCode.SYSTEM_ERROR);
151+
response.setMsg(CommonUtils.exceptionSimpleDesc(e));
152152
}
153153

154154
return response;
155155
}
156156

157+
private void asyncSubmit(RemotingCommand requestCommand, final SubmitCallback submitCallback) throws JobTrackerNotFoundException {
158+
final CountDownLatch latch = new CountDownLatch(1);
159+
remotingClient.invokeAsync(requestCommand, new InvokeCallback() {
160+
@Override
161+
public void operationComplete(ResponseFuture responseFuture) {
162+
try {
163+
submitCallback.call(responseFuture.getResponseCommand());
164+
} finally {
165+
latch.countDown();
166+
}
167+
}
168+
});
169+
try {
170+
latch.await(Constants.LATCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
171+
} catch (InterruptedException e) {
172+
throw new JobSubmitException("submit job failed, async request timeout!", e);
173+
}
174+
}
175+
176+
private void syncSubmit(RemotingCommand requestCommand, final SubmitCallback submitCallback) throws JobTrackerNotFoundException {
177+
submitCallback.call(remotingClient.invokeSync(requestCommand));
178+
}
179+
157180
/**
158181
* @param jobs
159182
* @return
@@ -192,4 +215,9 @@ protected NettyRequestProcessor getDefaultProcessor() {
192215
public void setJobFinishedHandler(JobFinishedHandler jobFinishedHandler) {
193216
this.jobFinishedHandler = jobFinishedHandler;
194217
}
218+
219+
enum SubmitType {
220+
SYNC, // 同步
221+
ASYNC // 异步
222+
}
195223
}

job-client/src/main/java/com/lts/job/client/RetryJobClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import com.lts.job.client.support.JobSubmitProtectException;
88
import com.lts.job.core.domain.Job;
99
import com.lts.job.core.support.RetryScheduler;
10+
import com.lts.job.core.util.JSONUtils;
11+
import sun.security.util.Resources_es;
1012

1113
import java.util.Arrays;
1214
import java.util.List;
@@ -31,7 +33,8 @@ protected boolean isRemotingEnable() {
3133
@Override
3234
protected boolean retry(List<Job> jobs) {
3335
try {
34-
return superSubmitJob(jobs).isSuccess();
36+
// 重试必须走同步,不然会造成文件锁,死锁
37+
return superSubmitJob(jobs, SubmitType.SYNC).isSuccess();
3538
} catch (Throwable t) {
3639
LOGGER.error(t.getMessage(), t);
3740
}
@@ -64,6 +67,8 @@ public Response submitJob(List<Job> jobs) {
6467
response.setFailedJobs(jobs);
6568
response.setCode(ResponseCode.SUBMIT_TOO_BUSY_AND_SAVE_FOR_LATER);
6669
response.setMsg(response.getMsg() + ", submit too busy , save local fail store and send later !");
70+
LOGGER.warn(JSONUtils.toJSONString(response));
71+
return response;
6772
}
6873
if (!response.isSuccess()) {
6974
try {
@@ -73,6 +78,7 @@ public Response submitJob(List<Job> jobs) {
7378
response.setSuccess(true);
7479
response.setCode(ResponseCode.SUBMIT_FAILED_AND_SAVE_FOR_LATER);
7580
response.setMsg(response.getMsg() + ", save local fail store and send later !");
81+
LOGGER.warn(JSONUtils.toJSONString(response));
7682
} catch (Exception e) {
7783
response.setSuccess(false);
7884
response.setMsg(e.getMessage());
@@ -85,4 +91,8 @@ public Response submitJob(List<Job> jobs) {
8591
private Response superSubmitJob(List<Job> jobs) {
8692
return super.submitJob(jobs);
8793
}
94+
95+
private Response superSubmitJob(List<Job> jobs, SubmitType type) {
96+
return super.submitJob(jobs, type);
97+
}
8898
}

job-client/src/main/java/com/lts/job/client/domain/ResponseCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ private ResponseCode(){}
2020
// 提交太块
2121
public static final String SUBMIT_TOO_BUSY_AND_SAVE_FOR_LATER = "14";
2222

23+
public static final String SYSTEM_ERROR = "15";
2324
}

job-client/src/main/java/com/lts/job/client/processor/JobFinishedProcessor.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ public class JobFinishedProcessor extends AbstractProcessor {
2323

2424
private JobFinishedHandler jobFinishedHandler;
2525

26-
public JobFinishedProcessor(RemotingClientDelegate remotingClient, JobFinishedHandler jobFinishedHandler) {
26+
public JobFinishedProcessor(RemotingClientDelegate remotingClient,
27+
JobFinishedHandler jobFinishedHandler) {
2728
super(remotingClient);
2829
this.jobFinishedHandler = jobFinishedHandler;
2930
if (this.jobFinishedHandler == null) {
@@ -36,9 +37,9 @@ public void handle(List<JobResult> jobResults) {
3637
if (CollectionUtils.isNotEmpty(jobResults)) {
3738
for (JobResult jobResult : jobResults) {
3839
if (jobResult.isSuccess()) {
39-
log.info("任务执行成功:" + jobResult);
40+
log.info("Job exec successful:" + jobResult);
4041
} else {
41-
log.info("任务执行失败:" + jobResult);
42+
log.info("Job exec failed:" + jobResult);
4243
}
4344
}
4445
}
@@ -48,7 +49,8 @@ public void handle(List<JobResult> jobResults) {
4849
}
4950

5051
@Override
51-
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
52+
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
53+
throws RemotingCommandException {
5254

5355
JobFinishedRequest requestBody = request.getBody();
5456
try {
@@ -57,6 +59,6 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
5759
LOGGER.error(t.getMessage(), t);
5860
}
5961

60-
return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_NOTIFY_SUCCESS.code(), "接受成功");
62+
return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_NOTIFY_SUCCESS.code(), "received successful");
6163
}
6264
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.lts.job.client.support;
2+
3+
import com.lts.job.remoting.protocol.RemotingCommand;
4+
5+
/**
6+
* @author Robert HG (254963746@qq.com) on 5/30/15.
7+
*/
8+
public interface SubmitCallback {
9+
10+
public void call(final RemotingCommand responseCommand);
11+
12+
}

job-core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
<dependency>
8383
<groupId>com.alibaba</groupId>
8484
<artifactId>druid</artifactId>
85+
<scope>provided</scope>
8586
</dependency>
8687
<dependency>
8788
<groupId>com.sleepycat</groupId>

job-core/src/main/java/com/lts/job/core/cluster/AbstractJobNode.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ final public void start() {
6262

6363
registry.register(node);
6464

65-
LOGGER.info("启动成功!");
65+
LOGGER.info("started!");
6666

6767
} catch (Throwable e) {
68-
LOGGER.error("启动失败!", e);
68+
LOGGER.error("start failed!", e);
6969
}
7070
}
7171

@@ -91,9 +91,9 @@ final public void stop() {
9191

9292
innerStop();
9393

94-
LOGGER.info("停止成功!");
94+
LOGGER.info("stop success!");
9595
} catch (Throwable e) {
96-
LOGGER.error("停止失败!", e);
96+
LOGGER.error("stop failed!", e);
9797
}
9898
}
9999

@@ -102,7 +102,7 @@ public void destroy() {
102102
try {
103103
registry.destroy();
104104
} catch (Throwable e) {
105-
LOGGER.error("销毁失败!", e);
105+
LOGGER.error("destroy failed!", e);
106106
}
107107
}
108108

@@ -151,7 +151,7 @@ protected void initConfig() {
151151
node = NodeFactory.create(getNodeClass(), config);
152152
config.setNodeType(node.getNodeType());
153153

154-
LOGGER.info("当前节点配置:{}", config);
154+
LOGGER.info("Current node config :{}", config);
155155

156156
application.setEventCenter(eventCenterFactory.getEventCenter(config));
157157

job-core/src/main/java/com/lts/job/core/cluster/MasterElector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ public void removeNode(List<Node> removedNodes) {
101101
private void notifyListener() {
102102
boolean isMaster = false;
103103
if (application.getConfig().getIdentity().equals(master.getIdentity())) {
104-
LOGGER.info("Master节点变化为当前节点:{}", master);
104+
LOGGER.info("Current node become the master node:{}", master);
105105
isMaster = true;
106106
} else {
107-
LOGGER.info("Master节点为:{}", master);
107+
LOGGER.info("Master node is :{}", master);
108108
isMaster = false;
109109
}
110110

@@ -113,7 +113,7 @@ private void notifyListener() {
113113
try {
114114
masterChangeListener.change(master, isMaster);
115115
} catch (Throwable t) {
116-
LOGGER.error("masterChangeListener通知失败!", t);
116+
LOGGER.error("MasterChangeListener notify error!", t);
117117
}
118118
}
119119
}

job-core/src/main/java/com/lts/job/core/cluster/SubscribedNodeManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private void _addNode(Node node) {
5454
NODES.put(node.getNodeType(), nodeList);
5555
}
5656
nodeList.add(node);
57-
LOGGER.info("添加节点{}", node);
57+
LOGGER.info("add node {}", node);
5858
}
5959

6060
public List<Node> getNodeList(final NodeType nodeType, final String nodeGroup) {
@@ -79,7 +79,7 @@ private void removeNode(Node delNode) {
7979
for (Node node : nodeList) {
8080
if (node.getIdentity().equals(delNode.getIdentity())) {
8181
nodeList.remove(node);
82-
LOGGER.info("删除节点{}", node);
82+
LOGGER.info("remove node {}", node);
8383
}
8484
}
8585
}

job-core/src/main/java/com/lts/job/core/constant/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,6 @@ public interface Constants {
7373

7474
public static final String PROCESSOR_THREAD = "job.processor.thread";
7575
public static final int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5;
76+
77+
public static final int LATCH_TIMEOUT_MILLIS = 10 * 60 * 1000; // 10分钟
7678
}

0 commit comments

Comments
 (0)