Skip to content

Commit a568a75

Browse files
committed
Merge pull request #14 from qq254963746/develop
berkeleydb 的 FailStore 实现
2 parents a0df09e + 4535665 commit a568a75

File tree

16 files changed

+275
-139
lines changed

16 files changed

+275
-139
lines changed

job-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,9 @@
8282
<groupId>com.alibaba</groupId>
8383
<artifactId>druid</artifactId>
8484
</dependency>
85+
<dependency>
86+
<groupId>com.sleepycat</groupId>
87+
<artifactId>je</artifactId>
88+
</dependency>
8589
</dependencies>
8690
</project>

job-core/src/main/java/com/lts/job/core/failstore/FailStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.List;
77

88
/**
9-
* Created by hugui on 5/21/15.
9+
* Robert HG (254963746@qq.com) on 5/21/15.
1010
*/
1111
public interface FailStore {
1212

job-core/src/main/java/com/lts/job/core/failstore/FailStoreException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.lts.job.core.failstore;
22

33
/**
4-
* Created by hugui on 5/21/15.
4+
* Robert HG (254963746@qq.com) on 5/21/15.
55
*/
66
public class FailStoreException extends Exception {
77

job-core/src/main/java/com/lts/job/core/failstore/FailStoreFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.lts.job.core.extension.SPI;
66

77
/**
8-
* Created by hugui on 5/21/15.
8+
* Robert HG (254963746@qq.com) on 5/21/15.
99
*/
1010
@SPI("leveldb")
1111
public interface FailStoreFactory {
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package com.lts.job.core.failstore.berkeleydb;
2+
3+
import com.lts.job.core.cluster.Config;
4+
import com.lts.job.core.domain.KVPair;
5+
import com.lts.job.core.failstore.FailStore;
6+
import com.lts.job.core.failstore.FailStoreException;
7+
import com.lts.job.core.file.FileUtils;
8+
import com.lts.job.core.logger.Logger;
9+
import com.lts.job.core.logger.LoggerFactory;
10+
import com.lts.job.core.util.CollectionUtils;
11+
import com.lts.job.core.util.JSONUtils;
12+
import com.sleepycat.je.*;
13+
14+
import java.io.File;
15+
import java.lang.reflect.Type;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
19+
/**
20+
* Robert HG (254963746@qq.com) on 5/26/15.
21+
*/
22+
public class BerkeleydbFailStore implements FailStore {
23+
24+
private static final Logger LOGGER = LoggerFactory.getLogger(BerkeleydbFailStore.class);
25+
private Environment environment;
26+
private Database db;
27+
private EnvironmentConfig envConfig;
28+
private File envHome;
29+
30+
public BerkeleydbFailStore(Config config) {
31+
try {
32+
envHome = FileUtils.createDirIfNotExist(config.getFailStorePath());
33+
envConfig = new EnvironmentConfig();
34+
// 如果不存在则创建一个
35+
envConfig.setAllowCreate(true);
36+
// 以只读方式打开,默认为false
37+
envConfig.setReadOnly(false);
38+
// 事务支持,如果为true,则表示当前环境支持事务处理,默认为false,不支持事务处理
39+
envConfig.setTransactional(true);
40+
// Configures the durability associated with transactions.
41+
envConfig.setDurability(Durability.COMMIT_SYNC);
42+
} catch (DatabaseException e) {
43+
throw new RuntimeException(e);
44+
}
45+
}
46+
47+
@Override
48+
public void open() throws FailStoreException {
49+
environment = new Environment(envHome, envConfig);
50+
DatabaseConfig dbConfig = new DatabaseConfig();
51+
dbConfig.setAllowCreate(true);
52+
dbConfig.setSortedDuplicates(false);
53+
dbConfig.setTransactional(true);
54+
try {
55+
db = environment.openDatabase(null, "lts", dbConfig);
56+
} catch (DatabaseException e) {
57+
throw new FailStoreException(e);
58+
}
59+
}
60+
61+
@Override
62+
public void put(String key, Object value) throws FailStoreException {
63+
try {
64+
String valueString = JSONUtils.toJSONString(value);
65+
OperationStatus status = db.put(null, new DatabaseEntry(key.getBytes("UTF-8")),
66+
new DatabaseEntry(valueString.getBytes("UTF-8")));
67+
} catch (Exception e) {
68+
throw new FailStoreException(e);
69+
}
70+
}
71+
72+
@Override
73+
public void delete(String key) throws FailStoreException {
74+
try {
75+
DatabaseEntry delKey = new DatabaseEntry();
76+
delKey.setData(key.getBytes("UTF-8"));
77+
OperationStatus status = db.delete(null, delKey);
78+
} catch (Exception e) {
79+
throw new FailStoreException(e);
80+
}
81+
}
82+
83+
@Override
84+
public void delete(List<String> keys) throws FailStoreException {
85+
if (CollectionUtils.isEmpty(keys)) {
86+
return;
87+
}
88+
for (String key : keys) {
89+
delete(key);
90+
}
91+
}
92+
93+
@Override
94+
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
95+
Cursor cursor = null;
96+
try {
97+
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>();
98+
99+
cursor = db.openCursor(null, CursorConfig.DEFAULT);
100+
DatabaseEntry foundKey = new DatabaseEntry();
101+
DatabaseEntry foundValue = new DatabaseEntry();
102+
while (cursor.getNext(foundKey, foundValue, LockMode.DEFAULT) ==
103+
OperationStatus.SUCCESS) {
104+
String key = new String(foundKey.getData(), "UTF-8");
105+
String valueString = new String(foundValue.getData(), "UTF-8");
106+
107+
T value = JSONUtils.parse(valueString, type);
108+
KVPair<String, T> pair = new KVPair<String, T>(key, value);
109+
list.add(pair);
110+
if (list.size() >= size) {
111+
break;
112+
}
113+
}
114+
return list;
115+
} catch (Exception e) {
116+
throw new FailStoreException(e);
117+
} finally {
118+
if (cursor != null) {
119+
try {
120+
cursor.close();
121+
} catch (DatabaseException e) {
122+
// do nothing
123+
LOGGER.warn("close cursor failed! ", e);
124+
}
125+
}
126+
}
127+
}
128+
129+
@Override
130+
public void close() throws FailStoreException {
131+
// do nothing
132+
try {
133+
if (db != null) {
134+
db.close();
135+
}
136+
if (environment != null) {
137+
environment.cleanLog();
138+
environment.close();
139+
}
140+
} catch (Exception e) {
141+
throw new FailStoreException(e);
142+
}
143+
}
144+
145+
@Override
146+
public void destroy() throws FailStoreException {
147+
environment.removeDatabase(null, db.getDatabaseName());
148+
environment.close();
149+
}
150+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.lts.job.core.failstore.berkeleydb;
2+
3+
import com.lts.job.core.cluster.Config;
4+
import com.lts.job.core.failstore.FailStore;
5+
import com.lts.job.core.failstore.FailStoreFactory;
6+
7+
/**
8+
* Robert HG (254963746@qq.com) on 5/26/15.
9+
*/
10+
public class BerkeleydbFailStoreFactory implements FailStoreFactory {
11+
@Override
12+
public FailStore getFailStore(Config config) {
13+
return new BerkeleydbFailStore(config);
14+
}
15+
}

job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStore.java

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
import java.util.List;
2121

2222
/**
23-
* Created by hugui on 5/21/15.
23+
* Only a single process (possibly multi-threaded) can access a particular database at a time
24+
* Robert HG (254963746@qq.com) on 5/21/15.
2425
*/
2526
public class LeveldbFailStore implements FailStore {
2627

27-
// 文件锁 (同一时间只能有一个线程在 检查提交失败的任务)
28+
// 文件锁 (同一时间只能有一个线程访问leveldb文件)
2829
private FileAccessor dbLock;
2930
/**
3031
* 数据库目录
@@ -58,16 +59,24 @@ public void open() throws FailStoreException {
5859

5960
@Override
6061
public void put(String key, Object value) throws FailStoreException {
61-
String valueString = JSONUtils.toJSONString(value);
62-
db.put(key.getBytes(), valueString.getBytes());
62+
try {
63+
String valueString = JSONUtils.toJSONString(value);
64+
db.put(key.getBytes("UTF-8"), valueString.getBytes("UTF-8"));
65+
} catch (Exception e) {
66+
throw new FailStoreException(e);
67+
}
6368
}
6469

6570
@Override
6671
public void delete(String key) throws FailStoreException {
67-
if (key == null) {
68-
return;
72+
try {
73+
if (key == null) {
74+
return;
75+
}
76+
db.delete(key.getBytes("UTF-8"));
77+
} catch (Exception e) {
78+
throw new FailStoreException(e);
6979
}
70-
db.delete(key.getBytes());
7180
}
7281

7382
@Override
@@ -81,19 +90,23 @@ public void delete(List<String> keys) throws FailStoreException {
8190
}
8291

8392
@Override
84-
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) {
85-
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
86-
DBIterator iterator = db.iterator();
87-
for (iterator.seekToLast(); iterator.hasPrev(); iterator.prev()) {
88-
String key = new String(iterator.peekPrev().getKey());
89-
T value = JSONUtils.parse(new String(iterator.peekPrev().getValue()), type);
90-
KVPair<String, T> pair = new KVPair<String, T>(key, value);
91-
list.add(pair);
92-
if (list.size() >= size) {
93-
break;
93+
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
94+
try {
95+
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
96+
DBIterator iterator = db.iterator();
97+
for (iterator.seekToLast(); iterator.hasPrev(); iterator.prev()) {
98+
String key = new String(iterator.peekPrev().getKey(), "UTF-8");
99+
T value = JSONUtils.parse(new String(iterator.peekPrev().getValue(), "UTF-8"), type);
100+
KVPair<String, T> pair = new KVPair<String, T>(key, value);
101+
list.add(pair);
102+
if (list.size() >= size) {
103+
break;
104+
}
94105
}
106+
return list;
107+
} catch (Exception e) {
108+
throw new FailStoreException(e);
95109
}
96-
return list;
97110
}
98111

99112
@Override

job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStoreFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.lts.job.core.failstore.FailStoreFactory;
66

77
/**
8-
* Created by hugui on 5/21/15.
8+
* Robert HG (254963746@qq.com) on 5/21/15.
99
*/
1010
public class LeveldbFailStoreFactory implements FailStoreFactory {
1111
@Override

job-core/src/main/java/com/lts/job/core/file/FileUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,5 @@ public static String read(InputStream is) throws IOException {
4040
}
4141
return createTableSql.toString();
4242
}
43+
4344
}

0 commit comments

Comments
 (0)