Skip to content

Commit be20668

Browse files
author
胡贵
committed
加入LTS自带业务重试功能
1 parent 538cb5d commit be20668

File tree

6 files changed

+21
-12
lines changed

6 files changed

+21
-12
lines changed

lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ private void finishProcess(List<TaskTrackerJobResult> results) {
287287
} else {
288288
// 表示下次还要执行
289289
try {
290+
cronJobPo.setTaskTrackerIdentity(null);
290291
cronJobPo.setIsRunning(false);
291292
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
292293
cronJobPo.setGmtModified(System.currentTimeMillis());
@@ -316,8 +317,6 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
316317
// 重试次数+1
317318
jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1);
318319
Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime();
319-
// 延迟重试时间就等于重试次数(分钟)
320-
jobPo.setTriggerTime(nextRetryTriggerTime);
321320

322321
boolean needAdd = true;
323322

@@ -329,6 +328,7 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
329328
if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) {
330329
// 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间
331330
try {
331+
cronJobPo.setTaskTrackerIdentity(null);
332332
cronJobPo.setIsRunning(false);
333333
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
334334
cronJobPo.setGmtModified(System.currentTimeMillis());
@@ -345,6 +345,9 @@ private void retryProcess(List<TaskTrackerJobResult> results) {
345345
if (needAdd) {
346346
// 加入到队列, 重试
347347
jobPo.setIsRunning(false);
348+
jobPo.setTaskTrackerIdentity(null);
349+
// 延迟重试时间就等于重试次数(分钟)
350+
jobPo.setTriggerTime(nextRetryTriggerTime);
348351
application.getExecutableJobQueue().add(jobPo);
349352
}
350353
// 从正在执行的队列中移除

lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.lts.biz.logger.domain.JobLogPo;
44
import com.lts.biz.logger.domain.LogType;
5+
import com.lts.core.commons.utils.DateUtils;
56
import com.lts.core.constant.Constants;
67
import com.lts.core.constant.Level;
78
import com.lts.core.domain.Job;
@@ -171,6 +172,7 @@ public void operationComplete(ResponseFuture responseFuture) {
171172
JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
172173
jobLogPo.setSuccess(true);
173174
jobLogPo.setLogType(LogType.SENT);
175+
jobLogPo.setLogTime(DateUtils.currentTimeMillis());
174176
jobLogPo.setLevel(Level.INFO);
175177
application.getJobLogger().log(jobLogPo);
176178

lts-logger/lts-logger-mysql/src/main/java/com/lts/biz/logger/mysql/MysqlJobLogger.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public MysqlJobLogger(Config config) {
3939
insertSQL = "INSERT INTO `lts_job_log_po` (`log_time`,`gmt_created`, `log_type`, `success`, `msg`" +
4040
",`task_tracker_identity`, `level`, `task_id`, `job_id`" +
4141
", `priority`, `submit_node_group`, `task_tracker_node_group`, `ext_params`, `need_feedback`" +
42-
", `cron_expression`, `trigger_time`)" +
43-
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
42+
", `cron_expression`, `trigger_time`, `retry_times`)" +
43+
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
4444
}
4545

4646
@Override
@@ -65,7 +65,8 @@ public void log(JobLogPo jobLogPo) {
6565
JSONUtils.toJSONString(jobLogPo.getExtParams()),
6666
jobLogPo.isNeedFeedback(),
6767
jobLogPo.getCronExpression(),
68-
jobLogPo.getTriggerTime()
68+
jobLogPo.getTriggerTime(),
69+
jobLogPo.getRetryTimes()
6970
);
7071
} catch (SQLException e) {
7172
throw new JobLogException(e.getMessage(), e);
@@ -80,17 +81,17 @@ public void log(List<JobLogPo> jobLogPos) {
8081
String prefixSQL = "INSERT INTO `lts_job_log_po` ( `log_time`, `gmt_created`, `log_type`, `success`, `msg`" +
8182
",`task_tracker_identity`, `level`, `task_id`, `job_id`" +
8283
", `priority`, `submit_node_group`, `task_tracker_node_group`, `ext_params`, `need_feedback`" +
83-
", `cron_expression`, `trigger_time`) VALUES ";
84+
", `cron_expression`, `trigger_time`, `retry_times`) VALUES ";
8485
int size = jobLogPos.size();
8586
for (int i = 0; i < size; i++) {
8687
if (i == size - 1) {
87-
prefixSQL += "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
88+
prefixSQL += "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
8889
} else {
89-
prefixSQL += "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),";
90+
prefixSQL += "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),";
9091
}
9192
}
9293

93-
Object[][] params = new Object[size][16];
94+
Object[][] params = new Object[size][17];
9495
int index = 0;
9596
for (JobLogPo jobLogPo : jobLogPos) {
9697
int i = index++;
@@ -110,6 +111,7 @@ public void log(List<JobLogPo> jobLogPos) {
110111
params[i][13] = jobLogPo.isNeedFeedback();
111112
params[i][14] = jobLogPo.getCronExpression();
112113
params[i][15] = jobLogPo.getTriggerTime();
114+
params[i][16] = jobLogPo.getRetryTimes();
113115
}
114116

115117
try {
@@ -142,6 +144,7 @@ public List<JobLogPo> handle(ResultSet rs) throws SQLException {
142144
jobLogPo.setNeedFeedback(rs.getBoolean("need_feedback"));
143145
jobLogPo.setCronExpression(rs.getString("cron_expression"));
144146
jobLogPo.setTriggerTime(rs.getLong("trigger_time"));
147+
jobLogPo.setRetryTimes(rs.getInt("retry_times"));
145148
result.add(jobLogPo);
146149
}
147150
return result;

lts-logger/lts-logger-mysql/src/main/resources/sql/lts_job_log_po.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ CREATE TABLE IF NOT EXISTS `lts_job_log_po` (
1414
`submit_node_group` varchar(64) DEFAULT NULL COMMENT '提交节点group',
1515
`task_tracker_node_group` varchar(64) DEFAULT NULL COMMENT '执行节点group',
1616
`ext_params` text COMMENT '额外参数',
17-
`needFeedback` tinyint(4) DEFAULT NULL COMMENT '是否需要反馈',
17+
`need_feedback` tinyint(4) DEFAULT NULL COMMENT '是否需要反馈',
1818
`cron_expression` varchar(32) DEFAULT NULL COMMENT 'cron表达式',
1919
`trigger_time` bigint(20) DEFAULT NULL COMMENT '触发时间',
20+
`retry_times` int(11) DEFAULT NULL COMMENT '重试次数',
2021
PRIMARY KEY (`id`),
2122
KEY `log_time` (`log_time`),
2223
KEY `task_id_task_tracker_node_group` (`task_id`,`task_tracker_node_group`)

lts-queue/lts-queue-mysql/src/main/resources/sql/lts_cron_job_queue.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ CREATE TABLE IF NOT EXISTS `{tableName}` (
1616
`trigger_time` bigint(20) DEFAULT NULL COMMENT '下一次执行时间',
1717
PRIMARY KEY (`id`),
1818
UNIQUE KEY `idx_job_id` (`job_id`),
19-
UNIQUE KEY `idx_taskId_taskTrackerNodeGroup` (`task_id`, `task_tracker_node_group`),
19+
UNIQUE KEY `idx_taskId_taskTrackerNodeGroup` (`task_id`, `task_tracker_node_group`)
2020
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
CREATE TABLE IF NOT EXISTS `{tableName}` (
22
`node_type` varchar(16) NOT NULL DEFAULT '' COMMENT '节点类型',
33
`name` varchar(64) NOT NULL DEFAULT '' COMMENT '名字',
4-
`gmt_created` timestamp NULL DEFAULT NULL COMMENT '创建时间',
4+
`gmt_created` bigint(20) NULL DEFAULT NULL COMMENT '创建时间',
55
PRIMARY KEY (`node_type`,`name`)
66
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

0 commit comments

Comments
 (0)