Skip to content

Commit 7bb95cd

Browse files
author
胡贵
committed
add retry scheduler name
1 parent 96a5a28 commit 7bb95cd

File tree

6 files changed

+49
-14
lines changed

6 files changed

+49
-14
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
5454
###LTS Admin
5555
![Aaron Swartz](https://raw.githubusercontent.com/qq254963746/light-task-schedule/master/doc/LTS_Admin.png)
5656
###调用示例
57-
下面提供的是最简单的配置方式。更多配置请查看 [lts-example](https://github.com/qq254963746/light-task-schedule/tree/master/lts-example/src/main/java/com/lts/job/example/api) 模块下的 API 调用方式例子.
57+
下面提供的是最简单的配置方式。更多配置请查看 [lts-example](https://github.com/qq254963746/light-task-schedule/tree/master/lts-example/src/main/java/com/lts/example/api) 模块下的 API 调用方式例子.
5858

5959
####JobTracker 端
6060
```java
@@ -85,7 +85,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
8585
public void run(Job job) throws Throwable {
8686
System.out.println("我要执行"+ job);
8787
System.out.println(job.getParam("shopId"));
88-
// TODO 用户自己的业务逻辑
88+
// TODO 用户自己的业务逻辑, 应该保证幂等
8989
try {
9090
Thread.sleep(5*1000L);
9191
} catch (InterruptedException e) {

lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
import com.lts.core.failstore.FailStore;
88
import com.lts.core.failstore.FailStoreException;
99
import org.fusesource.leveldbjni.JniDBFactory;
10-
import org.iq80.leveldb.DB;
11-
import org.iq80.leveldb.DBIterator;
12-
import org.iq80.leveldb.Options;
10+
import org.iq80.leveldb.*;
1311

1412
import java.io.File;
1513
import java.io.IOException;
14+
import java.io.UnsupportedEncodingException;
1615
import java.lang.reflect.Type;
1716
import java.util.ArrayList;
1817
import java.util.List;
@@ -39,13 +38,22 @@ public LeveldbFailStore(String failStorePath) {
3938
failStorePath = failStorePath + "/leveldb/";
4039
dbPath = FileUtils.createDirIfNotExist(failStorePath);
4140
options = new Options();
41+
options.createIfMissing(true);
42+
options.cacheSize(100 * 1024 * 1024); // 100M
43+
// options.logger(new Logger() {
44+
// @Override
45+
// public void log(String message) {
46+
// System.out.println(message);
47+
// }
48+
// });
4249
lock = new FileLock(failStorePath + "___db.lock");
4350
}
4451

4552
@Override
4653
public void open() throws FailStoreException {
4754
try {
4855
lock.tryLock();
56+
JniDBFactory.factory.repair(dbPath, options);
4957
db = JniDBFactory.factory.open(dbPath, options);
5058
} catch (IOException e) {
5159
throw new FailStoreException(e);
@@ -79,16 +87,30 @@ public void delete(List<String> keys) throws FailStoreException {
7987
if (keys == null || keys.size() == 0) {
8088
return;
8189
}
82-
for (String key : keys) {
83-
delete(key);
90+
WriteBatch batch = db.createWriteBatch();
91+
try {
92+
93+
for (String key : keys) {
94+
batch.delete(key.getBytes("UTF-8"));
95+
}
96+
db.write(batch);
97+
} catch (UnsupportedEncodingException e) {
98+
throw new FailStoreException(e);
99+
} finally {
100+
try {
101+
batch.close();
102+
} catch (IOException e) {
103+
throw new FailStoreException(e);
104+
}
84105
}
85106
}
86107

87108
@Override
88109
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
110+
DBIterator iterator = null;
89111
try {
90112
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
91-
DBIterator iterator = db.iterator();
113+
iterator = db.iterator();
92114
for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) {
93115
Map.Entry<byte[], byte[]> entry = iterator.peekNext();
94116
String key = new String(entry.getKey(), "UTF-8");
@@ -102,6 +124,14 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
102124
return list;
103125
} catch (Exception e) {
104126
throw new FailStoreException(e);
127+
} finally {
128+
if (iterator != null) {
129+
try {
130+
iterator.close();
131+
} catch (IOException e) {
132+
throw new FailStoreException(e);
133+
}
134+
}
105135
}
106136
}
107137

lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
103103
try {
104104
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
105105
iterator = db.newIterator();
106-
for (iterator.seekToLast(); iterator.isValid(); iterator.prev()) {
106+
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
107107
iterator.status();
108108
String key = new String(iterator.key(), "UTF-8");
109109
T value = JSONUtils.parse(new String(iterator.value(), "UTF-8"), type);

lts-core/src/test/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ public void setup() throws FailStoreException {
3737
public void put() throws FailStoreException {
3838
Job job = new Job();
3939
job.setTaskId("2131232");
40-
failStore.put(key, job);
41-
failStore.close();
40+
for (int i = 0; i < 100; i++) {
41+
failStore.put(key + "" + i, job);
42+
}
4243
System.out.println("这里debug测试多线程");
43-
// fetchTop();
44+
failStore.close();
4445
}
4546

4647
@Test

lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ public void setup() throws FailStoreException {
3232
public void put() throws FailStoreException {
3333
Job job = new Job();
3434
job.setTaskId("2131232");
35-
failStore.put(key, job);
35+
for (int i = 0; i < 100; i++) {
36+
failStore.put(key + "" + i, job);
37+
}
3638
System.out.println("这里debug测试多线程");
3739
failStore.close();
3840
}

lts-core/src/test/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ public void setup() throws FailStoreException {
3737
public void put() throws FailStoreException {
3838
Job job = new Job();
3939
job.setTaskId("2131232");
40-
failStore.put(key, job);
40+
for (int i = 0; i < 100; i++) {
41+
failStore.put(key + "" + i, job);
42+
}
4143
System.out.println("这里debug测试多线程");
4244
failStore.close();
4345
}

0 commit comments

Comments
 (0)