Skip to content

Commit 141f328

Browse files
committed
Merge pull request #15 from qq254963746/develop
增加 rocksdb 的 FailStore 实现,并修复 文件锁bug
2 parents a568a75 + 135c707 commit 141f328

File tree

20 files changed

+817
-409
lines changed

20 files changed

+817
-409
lines changed

job-core/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
<dependency>
7373
<groupId>org.fusesource.leveldbjni</groupId>
7474
<artifactId>leveldbjni-all</artifactId>
75+
<scope>provided</scope>
7576
</dependency>
7677
<dependency>
7778
<groupId>commons-dbutils</groupId>
@@ -85,6 +86,12 @@
8586
<dependency>
8687
<groupId>com.sleepycat</groupId>
8788
<artifactId>je</artifactId>
89+
<scope>provided</scope>
90+
</dependency>
91+
<dependency>
92+
<groupId>org.rocksdb</groupId>
93+
<artifactId>rocksdbjni</artifactId>
94+
<scope>provided</scope>
8895
</dependency>
8996
</dependencies>
9097
</project>

job-core/src/main/java/com/lts/job/core/failstore/berkeleydb/BerkeleydbFailStore.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.lts.job.core.domain.KVPair;
55
import com.lts.job.core.failstore.FailStore;
66
import com.lts.job.core.failstore.FailStoreException;
7+
import com.lts.job.core.file.FileLock;
78
import com.lts.job.core.file.FileUtils;
89
import com.lts.job.core.logger.Logger;
910
import com.lts.job.core.logger.LoggerFactory;
@@ -26,10 +27,13 @@ public class BerkeleydbFailStore implements FailStore {
2627
private Database db;
2728
private EnvironmentConfig envConfig;
2829
private File envHome;
30+
private FileLock lock;
31+
DatabaseConfig dbConfig;
2932

3033
public BerkeleydbFailStore(Config config) {
3134
try {
32-
envHome = FileUtils.createDirIfNotExist(config.getFailStorePath());
35+
String failStorePath = config.getFailStorePath() + "berkeleydb/";
36+
envHome = FileUtils.createDirIfNotExist(failStorePath);
3337
envConfig = new EnvironmentConfig();
3438
// 如果不存在则创建一个
3539
envConfig.setAllowCreate(true);
@@ -39,21 +43,26 @@ public BerkeleydbFailStore(Config config) {
3943
envConfig.setTransactional(true);
4044
// Configures the durability associated with transactions.
4145
envConfig.setDurability(Durability.COMMIT_SYNC);
46+
47+
dbConfig = new DatabaseConfig();
48+
dbConfig.setAllowCreate(true);
49+
dbConfig.setSortedDuplicates(false);
50+
dbConfig.setTransactional(true);
51+
52+
lock = new FileLock(failStorePath + "___db.lock");
53+
4254
} catch (DatabaseException e) {
4355
throw new RuntimeException(e);
4456
}
4557
}
4658

4759
@Override
4860
public void open() throws FailStoreException {
49-
environment = new Environment(envHome, envConfig);
50-
DatabaseConfig dbConfig = new DatabaseConfig();
51-
dbConfig.setAllowCreate(true);
52-
dbConfig.setSortedDuplicates(false);
53-
dbConfig.setTransactional(true);
5461
try {
62+
lock.tryLock();
63+
environment = new Environment(envHome, envConfig);
5564
db = environment.openDatabase(null, "lts", dbConfig);
56-
} catch (DatabaseException e) {
65+
} catch (Exception e) {
5766
throw new FailStoreException(e);
5867
}
5968
}
@@ -119,7 +128,6 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
119128
try {
120129
cursor.close();
121130
} catch (DatabaseException e) {
122-
// do nothing
123131
LOGGER.warn("close cursor failed! ", e);
124132
}
125133
}
@@ -128,23 +136,32 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
128136

129137
@Override
130138
public void close() throws FailStoreException {
131-
// do nothing
132139
try {
133140
if (db != null) {
134141
db.close();
135142
}
136-
if (environment != null) {
143+
if (environment != null && environment.isValid()) {
137144
environment.cleanLog();
138145
environment.close();
139146
}
140147
} catch (Exception e) {
141148
throw new FailStoreException(e);
149+
} finally {
150+
lock.release();
142151
}
143152
}
144153

145154
@Override
146155
public void destroy() throws FailStoreException {
147-
environment.removeDatabase(null, db.getDatabaseName());
148-
environment.close();
156+
try {
157+
if (environment != null) {
158+
environment.removeDatabase(null, db.getDatabaseName());
159+
environment.close();
160+
}
161+
} catch (Exception e) {
162+
throw new FailStoreException(e);
163+
} finally {
164+
lock.delete();
165+
}
149166
}
150167
}

job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStore.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
import com.lts.job.core.domain.KVPair;
55
import com.lts.job.core.failstore.FailStore;
66
import com.lts.job.core.failstore.FailStoreException;
7-
import com.lts.job.core.file.FileAccessor;
8-
import com.lts.job.core.file.FileException;
7+
import com.lts.job.core.file.FileLock;
98
import com.lts.job.core.file.FileUtils;
109
import com.lts.job.core.util.JSONUtils;
1110
import org.fusesource.leveldbjni.JniDBFactory;
@@ -26,7 +25,7 @@
2625
public class LeveldbFailStore implements FailStore {
2726

2827
// 文件锁 (同一时间只能有一个线程访问leveldb文件)
29-
private FileAccessor dbLock;
28+
private FileLock lock;
3029
/**
3130
* 数据库目录
3231
*/
@@ -37,20 +36,16 @@ public class LeveldbFailStore implements FailStore {
3736
private Options options;
3837

3938
public LeveldbFailStore(Config config) {
40-
dbPath = FileUtils.createDirIfNotExist(config.getFailStorePath());
39+
String failStorePath = config.getFailStorePath() + "leveldb/";
40+
dbPath = FileUtils.createDirIfNotExist(failStorePath);
4141
options = new Options();
42-
try {
43-
dbLock = new FileAccessor(config.getFailStorePath() + "___db.lock");
44-
dbLock.createIfNotExist();
45-
} catch (FileException e) {
46-
throw new RuntimeException(e);
47-
}
42+
lock = new FileLock(failStorePath + "___db.lock");
4843
}
4944

5045
@Override
5146
public void open() throws FailStoreException {
52-
dbLock.tryLock();
5347
try {
48+
lock.tryLock();
5449
db = JniDBFactory.factory.open(dbPath, options);
5550
} catch (IOException e) {
5651
throw new FailStoreException(e);
@@ -112,19 +107,23 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
112107
@Override
113108
public void close() throws FailStoreException {
114109
try {
115-
db.close();
116-
dbLock.unlock();
110+
if (db != null) {
111+
db.close();
112+
}
117113
} catch (IOException e) {
118114
throw new FailStoreException(e);
115+
} finally {
116+
lock.release();
119117
}
120118
}
121119

122120
public void destroy() throws FailStoreException {
123121
try {
124122
JniDBFactory.factory.destroy(dbPath, options);
125-
dbLock.delete();
126123
} catch (IOException e) {
127124
throw new FailStoreException(e);
125+
} finally {
126+
lock.delete();
128127
}
129128
}
130129
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package com.lts.job.core.failstore.rocksdb;
2+
3+
import com.lts.job.core.cluster.Config;
4+
import com.lts.job.core.domain.KVPair;
5+
import com.lts.job.core.failstore.FailStore;
6+
import com.lts.job.core.failstore.FailStoreException;
7+
import com.lts.job.core.file.FileLock;
8+
import com.lts.job.core.file.FileUtils;
9+
import com.lts.job.core.util.CollectionUtils;
10+
import com.lts.job.core.util.JSONUtils;
11+
import org.rocksdb.*;
12+
import org.rocksdb.util.SizeUnit;
13+
14+
import java.lang.reflect.Type;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
18+
/**
19+
* Robert HG (254963746@qq.com) on 5/27/15.
20+
*/
21+
public class RocksdbFailStore implements FailStore {
22+
23+
private RocksDB db = null;
24+
private Options options;
25+
private String failStorePath;
26+
private FileLock lock;
27+
28+
public RocksdbFailStore(Config config) {
29+
failStorePath = config.getFailStorePath() + "rocksdb/";
30+
FileUtils.createDirIfNotExist(failStorePath);
31+
options = new Options();
32+
options.setCreateIfMissing(true)
33+
.setWriteBufferSize(8 * SizeUnit.KB)
34+
.setMaxWriteBufferNumber(3)
35+
.setMaxBackgroundCompactions(10)
36+
.setCompressionType(CompressionType.SNAPPY_COMPRESSION)
37+
.setCompactionStyle(CompactionStyle.UNIVERSAL);
38+
39+
Filter bloomFilter = new BloomFilter(10);
40+
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
41+
tableConfig.setBlockCacheSize(64 * SizeUnit.KB)
42+
.setFilter(bloomFilter)
43+
.setCacheNumShardBits(6)
44+
.setBlockSizeDeviation(5)
45+
.setBlockRestartInterval(10)
46+
.setCacheIndexAndFilterBlocks(true)
47+
.setHashIndexAllowCollision(false)
48+
.setBlockCacheCompressedSize(64 * SizeUnit.KB)
49+
.setBlockCacheCompressedNumShardBits(10);
50+
51+
options.setTableFormatConfig(tableConfig);
52+
lock = new FileLock(failStorePath + "___db.lock");
53+
// TODO other settings
54+
}
55+
56+
@Override
57+
public void open() throws FailStoreException {
58+
try {
59+
lock.tryLock();
60+
db = RocksDB.open(options, failStorePath);
61+
} catch (Exception e) {
62+
throw new FailStoreException(e);
63+
}
64+
}
65+
66+
@Override
67+
public void put(String key, Object value) throws FailStoreException {
68+
String valueString = JSONUtils.toJSONString(value);
69+
WriteOptions writeOpts = new WriteOptions();
70+
try {
71+
writeOpts.setSync(true);
72+
writeOpts.setDisableWAL(true);
73+
db.put(writeOpts, key.getBytes("UTF-8"), valueString.getBytes("UTF-8"));
74+
} catch (Exception e) {
75+
throw new FailStoreException(e);
76+
} finally {
77+
writeOpts.dispose();
78+
}
79+
}
80+
81+
@Override
82+
public void delete(String key) throws FailStoreException {
83+
try {
84+
db.remove(key.getBytes("UTF-8"));
85+
} catch (Exception e) {
86+
throw new FailStoreException(e);
87+
}
88+
}
89+
90+
@Override
91+
public void delete(List<String> keys) throws FailStoreException {
92+
if (CollectionUtils.isEmpty(keys)) {
93+
return;
94+
}
95+
for (String key : keys) {
96+
delete(key);
97+
}
98+
}
99+
100+
@Override
101+
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
102+
RocksIterator iterator = null;
103+
try {
104+
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
105+
iterator = db.newIterator();
106+
for (iterator.seekToLast(); iterator.isValid(); iterator.prev()) {
107+
iterator.status();
108+
String key = new String(iterator.key(), "UTF-8");
109+
T value = JSONUtils.parse(new String(iterator.value(), "UTF-8"), type);
110+
KVPair<String, T> pair = new KVPair<String, T>(key, value);
111+
list.add(pair);
112+
if (list.size() >= size) {
113+
break;
114+
}
115+
}
116+
return list;
117+
} catch (Exception e) {
118+
throw new FailStoreException(e);
119+
} finally {
120+
if (iterator != null) {
121+
iterator.dispose();
122+
}
123+
}
124+
}
125+
126+
@Override
127+
public void close() throws FailStoreException {
128+
try {
129+
if (db != null) {
130+
db.close();
131+
}
132+
} catch (Exception e) {
133+
throw new FailStoreException(e);
134+
} finally {
135+
lock.release();
136+
}
137+
}
138+
139+
@Override
140+
public void destroy() throws FailStoreException {
141+
try {
142+
db.close();
143+
options.dispose();
144+
} catch (Exception e) {
145+
throw new FailStoreException(e);
146+
} finally {
147+
lock.delete();
148+
}
149+
}
150+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.lts.job.core.failstore.rocksdb;
2+
3+
import com.lts.job.core.cluster.Config;
4+
import com.lts.job.core.failstore.FailStore;
5+
import com.lts.job.core.failstore.FailStoreFactory;
6+
7+
/**
8+
* Robert HG (254963746@qq.com) on 5/27/15.
9+
*/
10+
public class RocksdbFailStoreFactory implements FailStoreFactory {
11+
12+
@Override
13+
public FailStore getFailStore(Config config) {
14+
return new RocksdbFailStore(config);
15+
}
16+
}

0 commit comments

Comments
 (0)