Skip to content

Commit 7b8c244

Browse files
committed
将 JobClient 和 TaskTracker 传送失败的信息 从文件 改为 存储到 leveldb 中
1 parent b6acc4e commit 7b8c244

File tree

11 files changed

+318
-148
lines changed

11 files changed

+318
-148
lines changed

job-client/src/main/java/com/lts/job/client/RetryJobClient.java

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,22 @@
44
import com.lts.job.client.domain.ResponseCode;
55
import com.lts.job.core.domain.Job;
66
import com.lts.job.client.domain.Response;
7-
import com.lts.job.core.file.FileAccessor;
87
import com.lts.job.core.file.FileException;
98
import com.lts.job.core.file.Line;
109
import com.lts.job.core.support.RetryScheduler;
11-
import com.lts.job.core.util.JsonUtils;
10+
import com.lts.job.core.util.JSONUtils;
1211

1312
import java.util.ArrayList;
1413
import java.util.Arrays;
1514
import java.util.List;
1615

1716
/**
1817
* @author Robert HG (254963746@qq.com) on 8/14/14.
19-
* 重试 客户端, 如果 没有可用的JobTracker, 那么存文件, 定时重试
18+
* 重试 客户端, 如果 没有可用的JobTracker, 那么存文件, 定时重试
2019
*/
2120
public class RetryJobClient extends JobClient<JobClientNode> {
2221

2322
private RetryScheduler retryScheduler;
24-
private FileAccessor fileAccessor;
2523

2624
@Override
2725
protected void nodeStart() {
@@ -37,8 +35,6 @@ protected boolean retry(List<Job> jobs) {
3735
return superSubmitJob(jobs).isSuccess();
3836
}
3937
};
40-
fileAccessor = retryScheduler.getFileAccessor();
41-
4238
super.nodeStart();
4339
retryScheduler.start();
4440
}
@@ -59,22 +55,13 @@ public Response submitJob(List<Job> jobs) {
5955
Response response = superSubmitJob(jobs);
6056

6157
if (!response.isSuccess()) {
62-
// 存储文件
63-
List<Line> lines = new ArrayList<Line>();
64-
for (Job job : response.getFailedJobs()) {
65-
String line = JsonUtils.objectToJsonString(job);
66-
lines.add(new Line(line));
67-
}
68-
6958
try {
70-
if (fileAccessor == null) {
71-
throw new RuntimeException("save file error ! can not get file accessor !");
59+
for (Job job : response.getFailedJobs()) {
60+
retryScheduler.inSchedule(job.getTaskId(), job);
7261
}
73-
fileAccessor.addLines(lines);
7462
response.setSuccess(true);
7563
response.setCode(ResponseCode.FAILED_AND_SAVE_FILE);
76-
77-
} catch (FileException e) {
64+
} catch (Exception e) {
7865
response.setSuccess(false);
7966
response.setMsg(e.getMessage());
8067
}
@@ -83,7 +70,7 @@ public Response submitJob(List<Job> jobs) {
8370
return response;
8471
}
8572

86-
private Response superSubmitJob(List<Job> jobs){
73+
private Response superSubmitJob(List<Job> jobs) {
8774
return super.submitJob(jobs);
8875
}
8976
}

job-core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,10 @@
2222
<artifactId>morphia</artifactId>
2323
<version>0.104</version>
2424
</dependency>
25+
<dependency>
26+
<groupId>org.fusesource.leveldbjni</groupId>
27+
<artifactId>leveldbjni-all</artifactId>
28+
<version>1.8</version>
29+
</dependency>
2530
</dependencies>
2631
</project>

job-core/src/main/java/com/lts/job/core/domain/JobNodeConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
/**
66
* @author Robert HG (254963746@qq.com) on 8/20/14.
7-
* 任务节点配置
7+
* 任务节点配置
88
*/
99
public class JobNodeConfig {
1010

@@ -100,7 +100,7 @@ public void setJobInfoSavePath(String jobInfoSavePath) {
100100
}
101101

102102
public String getFilePath() {
103-
return jobInfoSavePath + "/" + nodeGroup + ".info";
103+
return jobInfoSavePath + "/" + nodeType + "/" + nodeGroup + "/";
104104
}
105105

106106
@Override
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.lts.job.core.domain;
2+
3+
/**
4+
* Created by hugui on 3/6/15.
5+
*/
6+
public class KVPair<Key, Value> {
7+
private Key key;
8+
private Value value;
9+
10+
public KVPair(Key key, Value value) {
11+
this.key = key;
12+
this.value = value;
13+
}
14+
15+
public Key getKey() {
16+
return key;
17+
}
18+
19+
public void setKey(Key key) {
20+
this.key = key;
21+
}
22+
23+
public Value getValue() {
24+
return value;
25+
}
26+
27+
public void setValue(Value value) {
28+
this.value = value;
29+
}
30+
}
31+

job-core/src/main/java/com/lts/job/core/file/FileAccessor.java

Lines changed: 23 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
/**
1414
* @author Robert HG (254963746@qq.com) on 8/14/14.
15-
* 文件访问器 (多进程多线程互斥)
15+
* 文件访问器 (多进程多线程互斥)
1616
*/
1717
public class FileAccessor {
1818

@@ -21,6 +21,10 @@ public class FileAccessor {
2121
private static final Logger LOGGER = LoggerFactory.getLogger(FileAccessor.class);
2222
private File file;
2323

24+
RandomAccessFile randomAccessFile = null;
25+
FileChannel channel = null;
26+
FileLock lock = null;
27+
2428
public FileAccessor(String filename) throws FileException {
2529
file = new File(filename);
2630
}
@@ -49,16 +53,8 @@ public boolean exists() {
4953
return file.exists();
5054
}
5155

52-
public void create() throws FileException {
53-
if (!file.exists()) {
54-
// 创建父目录
55-
file.getParentFile().mkdirs();
56-
try {
57-
file.createNewFile();
58-
} catch (IOException e) {
59-
throw new FileException("create file[" + file.getAbsolutePath() + "] failed!", e, FileException.FILE_CREATE);
60-
}
61-
}
56+
public void createIfNotExist(){
57+
FileUtils.createFileIfNotExist(file.getAbsolutePath());
6258
}
6359

6460
/**
@@ -70,7 +66,7 @@ public void create() throws FileException {
7066
*/
7167
public static FileAccessor create(String filename) throws FileException {
7268
FileAccessor fileAccessor = new FileAccessor(filename);
73-
fileAccessor.create();
69+
fileAccessor.createIfNotExist();
7470
return fileAccessor;
7571
}
7672

@@ -119,13 +115,8 @@ public void addLines(List<Line> lines) throws FileException {
119115
public List<Line> readLines() throws FileException {
120116

121117
List<Line> lines = new ArrayList<Line>();
122-
RandomAccessFile randomAccessFile = null;
123-
FileChannel channel = null;
124-
FileLock lock = null;
125118
try {
126-
randomAccessFile = new RandomAccessFile(file, "rw");
127-
channel = randomAccessFile.getChannel();
128-
tryLock(lock, channel);
119+
tryLock();
129120

130121
BufferedReader reader = new BufferedReader(new FileReader(randomAccessFile.getFD()));
131122
// 读取所有内容
@@ -139,7 +130,7 @@ public List<Line> readLines() throws FileException {
139130
} catch (Exception e) {
140131
throw new FileException(e, FileException.FILE_CONTENT_GET);
141132
} finally {
142-
closeFile(lock, channel, randomAccessFile);
133+
unlock();
143134
}
144135
return lines;
145136
}
@@ -152,14 +143,8 @@ public List<Line> readLines() throws FileException {
152143
public void deleteFirstLines(int num) throws FileException {
153144

154145
List<Line> lines = new ArrayList<Line>();
155-
RandomAccessFile randomAccessFile = null;
156-
FileChannel channel = null;
157-
FileLock lock = null;
158146
try {
159-
randomAccessFile = new RandomAccessFile(file, "rw");
160-
channel = randomAccessFile.getChannel();
161-
162-
tryLock(lock, channel);
147+
tryLock();
163148

164149
BufferedReader reader = new BufferedReader(new FileReader(randomAccessFile.getFD()));
165150
// 读取所有内容
@@ -184,29 +169,23 @@ public void deleteFirstLines(int num) throws FileException {
184169
} catch (Exception e) {
185170
throw new FileException(e, FileException.FILE_CONTENT_GET);
186171
} finally {
187-
closeFile(lock, channel, randomAccessFile);
172+
unlock();
188173
}
189174
}
190175

191176
/**
192177
* 清空文件
193178
*/
194179
public void empty() throws FileException {
195-
RandomAccessFile randomAccessFile = null;
196-
FileChannel channel = null;
197-
FileLock lock = null;
198180
try {
199-
randomAccessFile = new RandomAccessFile(file, "rw");
200-
channel = randomAccessFile.getChannel();
201-
202-
tryLock(lock, channel);
181+
tryLock();
203182

204183
channel.truncate(0);
205184

206185
} catch (Exception e) {
207186
throw new FileException(e, FileException.FILE_CONTENT_EMPTY);
208187
} finally {
209-
closeFile(lock, channel, randomAccessFile);
188+
unlock();
210189
}
211190
}
212191

@@ -215,15 +194,8 @@ public void empty() throws FileException {
215194
* 如果当前 修改时间和给定相同,那么情况
216195
*/
217196
public boolean compareAndEmpty(Long lastModified) throws FileException {
218-
219-
RandomAccessFile randomAccessFile = null;
220-
FileChannel channel = null;
221-
FileLock lock = null;
222197
try {
223-
randomAccessFile = new RandomAccessFile(file, "rw");
224-
channel = randomAccessFile.getChannel();
225-
226-
tryLock(lock, channel);
198+
tryLock();
227199

228200
if (lastModified == lastModified()) {
229201
channel.truncate(0);
@@ -234,7 +206,7 @@ public boolean compareAndEmpty(Long lastModified) throws FileException {
234206
} catch (Exception e) {
235207
throw new FileException(e, FileException.FILE_CONTENT_EMPTY);
236208
} finally {
237-
closeFile(lock, channel, randomAccessFile);
209+
unlock();
238210
}
239211
}
240212

@@ -244,34 +216,27 @@ public boolean compareAndEmpty(Long lastModified) throws FileException {
244216
* @throws FileException
245217
*/
246218
public boolean isEmpty() throws FileException {
247-
RandomAccessFile randomAccessFile = null;
248-
FileChannel channel = null;
249-
FileLock lock = null;
250219
try {
251-
randomAccessFile = new RandomAccessFile(file, "rw");
252-
channel = randomAccessFile.getChannel();
253-
254-
tryLock(lock, channel);
220+
tryLock();
255221

256222
return randomAccessFile.length() == 0;
257223

258224
} catch (Exception e) {
259225
throw new FileException(e, FileException.FILE_CONTENT_EMPTY);
260226
} finally {
261-
closeFile(lock, channel, randomAccessFile);
227+
unlock();
262228
}
263229
}
264230

265231
/**
266232
* 获得锁
267-
*
268-
* @param lock
269-
* @param channel
270233
*/
271-
private void tryLock(FileLock lock, FileChannel channel) {
234+
public void tryLock() {
272235
while (true) {
273-
274236
try {
237+
randomAccessFile = new RandomAccessFile(file, "rw");
238+
channel = randomAccessFile.getChannel();
239+
275240
lock = channel.tryLock();
276241
if (lock == null) {
277242
throw new AccessDeniedException("can not get file lock!");
@@ -291,12 +256,8 @@ private void tryLock(FileLock lock, FileChannel channel) {
291256

292257
/**
293258
* 关闭文件
294-
*
295-
* @param lock
296-
* @param channel
297-
* @param randomAccessFile
298259
*/
299-
private void closeFile(FileLock lock, FileChannel channel, RandomAccessFile randomAccessFile) {
260+
public void unlock() {
300261

301262
if (lock != null) {
302263
try {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.lts.job.core.file;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
6+
/**
7+
* Created by hugui on 3/6/15.
8+
*/
9+
public class FileUtils {
10+
11+
public static File createFileIfNotExist(String path) {
12+
File file = new File(path);
13+
if (!file.exists()) {
14+
// 创建父目录
15+
file.getParentFile().mkdirs();
16+
try {
17+
file.createNewFile();
18+
} catch (IOException e) {
19+
throw new RuntimeException("create file[" + file.getAbsolutePath() + "] failed!", e);
20+
}
21+
}
22+
return file;
23+
}
24+
25+
public static File createDirIfNotExist(String path) {
26+
File file = new File(path);
27+
if (!file.exists()) {
28+
// 创建父目录
29+
file.getParentFile().mkdirs();
30+
file.mkdir();
31+
}
32+
return file;
33+
}
34+
}

0 commit comments

Comments
 (0)