Skip to content

Commit db08a7e

Browse files
author
胡贵
committed
优化重试策略
1 parent 135c707 commit db08a7e

File tree

4 files changed

+47
-36
lines changed

4 files changed

+47
-36
lines changed

job-core/src/main/java/com/lts/job/core/cluster/Config.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ public String getParameter(String key, String defaultValue) {
137137
return value;
138138
}
139139

140+
public Map<String, String> getParameters() {
141+
return parameters;
142+
}
143+
140144
private Map<String, Number> getNumbers() {
141145
if (numbers == null) { // 允许并发重复创建
142146
numbers = new ConcurrentHashMap<String, Number>();

job-core/src/main/java/com/lts/job/core/file/FileLock.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,10 @@
55

66
import java.io.File;
77
import java.io.IOException;
8-
import java.nio.channels.ClosedChannelException;
98
import java.nio.channels.FileChannel;
10-
import java.nio.channels.OverlappingFileLockException;
119
import java.nio.file.Path;
1210
import java.nio.file.Paths;
1311
import java.nio.file.StandardOpenOption;
14-
import java.text.SimpleDateFormat;
15-
import java.util.Date;
1612
import java.util.HashMap;
1713
import java.util.Map;
1814
import java.util.concurrent.Semaphore;

job-core/src/main/java/com/lts/job/core/support/RetryScheduler.java

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.lts.job.core.failstore.FailStoreFactory;
99
import com.lts.job.core.logger.Logger;
1010
import com.lts.job.core.logger.LoggerFactory;
11+
import com.lts.job.core.util.CollectionUtils;
1112
import com.lts.job.core.util.GenericsUtils;
1213
import com.lts.job.core.util.JSONUtils;
1314

@@ -51,13 +52,10 @@ public void start() {
5152
if (RETRY_EXECUTOR_SERVICE == null) {
5253
RETRY_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
5354
// 这个时间后面再去优化
54-
scheduledFuture = RETRY_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CheckRunner(), 30, 30, TimeUnit.SECONDS);
55+
scheduledFuture = RETRY_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CheckRunner(), 10, 30, TimeUnit.SECONDS);
5556
}
5657
}
5758

58-
// 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到
59-
private int maxSentSize = 20;
60-
6159
public void stop() {
6260
try {
6361
scheduledFuture.cancel(true);
@@ -73,46 +71,60 @@ public void stop() {
7371
*/
7472
private class CheckRunner implements Runnable {
7573

74+
// 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到
75+
private int maxSentSize = 100;
76+
7677
@Override
7778
public void run() {
7879
try {
7980
// 1. 检测 远程连接 是否可用
8081
if (!isRemotingEnable()) {
8182
return;
8283
}
83-
try {
84+
85+
int sentSize = 0;
86+
87+
List<KVPair<String, T>> kvPairs = null;
88+
do {
8489
failStore.open();
85-
int sentSize = 0;
8690

87-
List<KVPair<String, T>> kvPairs = failStore.fetchTop(batchSize, type);
91+
kvPairs = failStore.fetchTop(batchSize, type);
8892

89-
while (kvPairs != null && kvPairs.size() > 0) {
90-
List<T> values = new ArrayList<T>(kvPairs.size());
91-
List<String> keys = new ArrayList<String>(kvPairs.size());
92-
for (KVPair<String, T> kvPair : kvPairs) {
93-
keys.add(kvPair.getKey());
94-
values.add(kvPair.getValue());
95-
}
96-
if (retry(values)) {
97-
LOGGER.info("本地任务发送成功, {}", JSONUtils.toJSONString(values));
98-
failStore.delete(keys);
99-
} else {
100-
break;
101-
}
102-
sentSize += kvPairs.size();
103-
if (sentSize >= maxSentSize) {
104-
// 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到
105-
break;
93+
if (CollectionUtils.isEmpty(kvPairs)) {
94+
break;
95+
}
96+
97+
List<T> values = new ArrayList<T>(kvPairs.size());
98+
List<String> keys = new ArrayList<String>(kvPairs.size());
99+
for (KVPair<String, T> kvPair : kvPairs) {
100+
keys.add(kvPair.getKey());
101+
values.add(kvPair.getValue());
102+
}
103+
if (retry(values)) {
104+
LOGGER.info("本地任务发送成功, {}", JSONUtils.toJSONString(values));
105+
failStore.delete(keys);
106+
} else {
107+
break;
108+
}
109+
sentSize += kvPairs.size();
110+
if (sentSize >= maxSentSize) {
111+
// 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到
112+
try {
113+
Thread.sleep(1000L);
114+
} catch (InterruptedException e1) {
115+
LOGGER.warn(e1.getMessage(), e1);
106116
}
107-
kvPairs = failStore.fetchTop(batchSize, type);
108117
}
109-
} finally {
118+
110119
failStore.close();
111-
}
120+
121+
} while (CollectionUtils.isNotEmpty(kvPairs));
122+
112123
} catch (Throwable e) {
113124
LOGGER.error(e.getMessage(), e);
114125
}
115126
}
127+
116128
}
117129

118130
public void inSchedule(String key, T value) {

job-example/src/main/java/com/lts/job/example/api/JobClientTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.lts.job.example.api;
22

3-
import com.alibaba.fastjson.JSONObject;
43
import com.lts.job.client.JobClient;
54
import com.lts.job.client.RetryJobClient;
65
import com.lts.job.client.domain.Response;
@@ -22,8 +21,8 @@
2221
public class JobClientTest extends BaseJobClientTest {
2322

2423
public static void main(String[] args) throws IOException {
25-
// console();
26-
testProtector();
24+
console();
25+
// testProtector();
2726
}
2827

2928
public static void console() throws IOException {
@@ -39,8 +38,8 @@ public static void console() throws IOException {
3938
jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
4039
// jobClient.setLoadBalance("consistenthash");
4140
// jobClient.addConfig("job.fail.store", "leveldb");
42-
// jobClient.addConfig("job.fail.store", "berkeleydb");
43-
jobClient.addConfig("job.fail.store", "rocksdb");
41+
jobClient.addConfig("job.fail.store", "berkeleydb");
42+
// jobClient.addConfig("job.fail.store", "rocksdb");
4443
jobClient.start();
4544

4645
JobClientTest jobClientTest = new JobClientTest();

0 commit comments

Comments
 (0)