Skip to content

Commit b2712c2

Browse files
committed
Merge pull request #31 from qq254963746/develop
定时任务在LTS Admin 中的添加和删除
2 parents 4adfbf9 + 81bcfce commit b2712c2

File tree

5 files changed

+22
-12
lines changed

5 files changed

+22
-12
lines changed

lts-core/src/main/java/com/lts/job/core/cluster/AbstractJobNode.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ final public void start() {
5858

5959
registry.register(node);
6060

61-
LOGGER.info("started!");
61+
LOGGER.info("Start success!");
6262

6363
} catch (Throwable e) {
64-
LOGGER.error("start failed!", e);
64+
LOGGER.error("Start failed!", e);
6565
}
6666
}
6767

@@ -72,18 +72,19 @@ final public void stop() {
7272
innerStop();
7373
remotingStop();
7474

75-
LOGGER.info("stop success!");
75+
LOGGER.info("Stop success!");
7676
} catch (Throwable e) {
77-
LOGGER.error("stop failed!", e);
77+
LOGGER.error("Stop failed!", e);
7878
}
7979
}
8080

8181
@Override
8282
public void destroy() {
8383
try {
8484
registry.destroy();
85+
LOGGER.info("Destroy success!");
8586
} catch (Throwable e) {
86-
LOGGER.error("destroy failed!", e);
87+
LOGGER.error("Destroy failed!", e);
8788
}
8889
}
8990

lts-core/src/main/java/com/lts/job/remoting/common/RemotingHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
*/
1212
public class RemotingHelper {
1313

14-
public static final String RemotingLogName = "LTSRemoting";
14+
public static final String RemotingLogName = "LTS.Remoting";
1515

1616
public static String exceptionSimpleDesc(final Exception e) {
1717
StringBuffer sb = new StringBuffer();

lts-job-client/src/main/java/com/lts/job/client/support/JobSubmitProtector.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ public JobSubmitProtector(int concurrentSize) {
3636
}
3737

3838
public Response execute(final List<Job> jobs, final JobSubmitExecutor<Response> jobSubmitExecutor) throws JobSubmitException {
39+
boolean acquire = false;
3940
try {
4041
try {
41-
boolean acquire = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
42+
acquire = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
4243
if (!acquire) {
4344
throw new JobSubmitProtectException(concurrentSize, errorMsg);
4445
}
@@ -47,7 +48,9 @@ public Response execute(final List<Job> jobs, final JobSubmitExecutor<Response>
4748
}
4849
return jobSubmitExecutor.execute(jobs);
4950
} finally {
50-
semaphore.release();
51+
if (acquire) {
52+
semaphore.release();
53+
}
5154
}
5255
}
5356

lts-queue/lts-queue-mongo/src/main/java/com/lts/job/queue/mongo/MongoExecutableJobQueue.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ public boolean add(JobPo jobPo) {
7676

7777
@Override
7878
public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) {
79+
boolean acquire = false;
7980
try {
80-
boolean acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
81+
acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
8182
if (!acquire) {
8283
// 直接返回null
8384
return null;
@@ -99,7 +100,9 @@ public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) {
99100
.set("gmtModified", System.currentTimeMillis());
100101
return template.findAndModify(query, operations, false);
101102
} finally {
102-
semaphore.release();
103+
if (acquire) {
104+
semaphore.release();
105+
}
103106
}
104107
}
105108

lts-queue/lts-queue-mysql/src/main/java/com/lts/job/queue/mysql/MysqlExecutableJobQueue.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,9 @@ public boolean add(JobPo jobPo) {
110110

111111
@Override
112112
public JobPo take(final String taskTrackerNodeGroup, final String taskTrackerIdentity) {
113+
boolean acquire = false;
113114
try {
114-
boolean acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
115+
acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
115116
if (!acquire) {
116117
// 直接返回null
117118
return null;
@@ -157,7 +158,9 @@ public JobPo take(final String taskTrackerNodeGroup, final String taskTrackerIde
157158
} catch (SQLException e) {
158159
throw new JobQueueException(e);
159160
} finally {
160-
semaphore.release();
161+
if(acquire){
162+
semaphore.release();
163+
}
161164
}
162165
}
163166

0 commit comments

Comments
 (0)