Skip to content

Commit ece7ccc

Browse files
author
胡贵
committed
performance logger and job queue
1 parent d783e50 commit ece7ccc

File tree

42 files changed

+242
-189
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+242
-189
lines changed

job-core/src/main/java/com/lts/job/core/Application.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package com.lts.job.core;
22

3+
import com.lts.job.core.cluster.Config;
34
import com.lts.job.core.cluster.MasterElector;
45
import com.lts.job.core.cluster.SubscribedNodeManager;
5-
import com.lts.job.core.cluster.Config;
66
import com.lts.job.core.protocol.command.CommandBodyWrapper;
7-
import com.lts.job.ec.EventCenter;
87

98
/**
109
* @author Robert HG (254963746@qq.com) on 8/17/14.
@@ -20,8 +19,6 @@ public abstract class Application {
2019
private MasterElector masterElector;
2120
// 节点通信CommandBody包装器
2221
private CommandBodyWrapper commandBodyWrapper;
23-
// 事件中心
24-
private EventCenter eventCenter;
2522

2623
public CommandBodyWrapper getCommandBodyWrapper() {
2724
return commandBodyWrapper;
@@ -55,12 +52,4 @@ public void setMasterElector(MasterElector masterElector) {
5552
this.masterElector = masterElector;
5653
}
5754

58-
public EventCenter getEventCenter() {
59-
return eventCenter;
60-
}
61-
62-
public void setEventCenter(EventCenter eventCenter) {
63-
this.eventCenter = eventCenter;
64-
}
65-
6655
}

job-core/src/main/java/com/lts/job/core/cluster/AbstractJobNode.java

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.lts.job.core.Application;
44
import com.lts.job.core.constant.EcTopic;
5+
import com.lts.job.core.extension.ExtensionLoader;
56
import com.lts.job.core.factory.JobNodeConfigFactory;
67
import com.lts.job.core.factory.NodeFactory;
78
import com.lts.job.core.listener.MasterChangeListener;
@@ -12,9 +13,10 @@
1213
import com.lts.job.core.registry.*;
1314
import com.lts.job.core.util.CollectionUtils;
1415
import com.lts.job.core.util.GenericsUtils;
16+
import com.lts.job.ec.EventCenterFactory;
1517
import com.lts.job.ec.EventInfo;
1618
import com.lts.job.ec.EventSubscriber;
17-
import com.lts.job.ec.JvmEventCenter;
19+
import com.lts.job.ec.injvm.InjvmEventCenter;
1820
import com.lts.job.ec.Observer;
1921
import com.lts.job.core.logger.Logger;
2022
import com.lts.job.core.logger.LoggerFactory;
@@ -35,42 +37,24 @@ public abstract class AbstractJobNode<T extends Node, App extends Application> i
3537
protected Config config;
3638
protected App application;
3739
private List<NodeChangeListener> nodeChangeListeners;
40+
private List<MasterChangeListener> masterChangeListeners;
41+
private EventCenterFactory eventCenterFactory = ExtensionLoader.getExtensionLoader(EventCenterFactory.class).getAdaptiveExtension();
3842

3943
public AbstractJobNode() {
4044
application = getApplication();
4145
config = JobNodeConfigFactory.getDefaultConfig();
4246
application.setConfig(config);
43-
// 事件中心
44-
application.setEventCenter(new JvmEventCenter());
45-
application.setCommandBodyWrapper(new CommandBodyWrapper(application));
46-
application.setMasterElector(new MasterElector(application));
4747
nodeChangeListeners = new ArrayList<NodeChangeListener>();
48+
masterChangeListeners = new ArrayList<MasterChangeListener>();
4849
}
4950

51+
5052
final public void start() {
5153
try {
54+
5255
// 初始化配置
5356
initConfig();
5457

55-
node = NodeFactory.create(getNodeClass(), config);
56-
config.setNodeType(node.getNodeType());
57-
58-
LOGGER.info("当前节点配置:{}", config);
59-
60-
// 监听节点 启用/禁用消息
61-
application.getEventCenter().subscribe(
62-
new String[]{EcTopic.NODE_DISABLE, EcTopic.NODE_ENABLE},
63-
new EventSubscriber(node.getIdentity(), new Observer() {
64-
@Override
65-
public void onObserved(EventInfo eventInfo) {
66-
if (EcTopic.NODE_DISABLE.equals(eventInfo.getTopic())) {
67-
nodeDisable();
68-
} else {
69-
nodeEnable();
70-
}
71-
}
72-
}));
73-
7458
innerStart();
7559

7660
initRegistry();
@@ -117,7 +101,7 @@ private void initRegistry() {
117101
// 用于master选举的监听器
118102
nodeChangeListeners.add(new MasterElectionListener(application));
119103
// 监听自己节点变化(如,当前节点被禁用了)
120-
nodeChangeListeners.add(new SelfChangeListener(application));
104+
nodeChangeListeners.add(new SelfChangeListener(config));
121105

122106
registry.subscribe(node, new NotifyListener() {
123107
private final Logger NOTIFY_LOGGER = LoggerFactory.getLogger(NotifyListener.class);
@@ -152,8 +136,28 @@ public void notify(NotifyEvent event, List<Node> nodes) {
152136
}
153137

154138
protected void initConfig() {
155-
// do nothing 让子类去实现
156-
139+
application.setCommandBodyWrapper(new CommandBodyWrapper(config));
140+
application.setMasterElector(new MasterElector(application));
141+
application.getMasterElector().addMasterChangeListener(masterChangeListeners);
142+
143+
node = NodeFactory.create(getNodeClass(), config);
144+
config.setNodeType(node.getNodeType());
145+
146+
LOGGER.info("当前节点配置:{}", config);
147+
148+
// 监听节点 启用/禁用消息
149+
eventCenterFactory.getEventCenter(config).subscribe(
150+
new String[]{EcTopic.NODE_DISABLE, EcTopic.NODE_ENABLE},
151+
new EventSubscriber(node.getIdentity(), new Observer() {
152+
@Override
153+
public void onObserved(EventInfo eventInfo) {
154+
if (EcTopic.NODE_DISABLE.equals(eventInfo.getTopic())) {
155+
nodeDisable();
156+
} else {
157+
nodeEnable();
158+
}
159+
}
160+
}));
157161
}
158162

159163
protected abstract void innerStart();
@@ -228,7 +232,9 @@ public void addNodeChangeListener(NodeChangeListener notifyListener) {
228232
* @param masterChangeListener
229233
*/
230234
public void addMasterChangeListener(MasterChangeListener masterChangeListener) {
231-
application.getMasterElector().addMasterChangeListener(masterChangeListener);
235+
if (masterChangeListener != null) {
236+
masterChangeListeners.add(masterChangeListener);
237+
}
232238
}
233239

234240
/**

job-core/src/main/java/com/lts/job/core/cluster/MasterElector.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import com.lts.job.core.logger.LoggerFactory;
88

99
import java.util.ArrayList;
10+
import java.util.Arrays;
1011
import java.util.List;
12+
import java.util.concurrent.CopyOnWriteArrayList;
1113

1214
/**
1315
* @author Robert HG (254963746@qq.com) on 8/23/14.
@@ -29,10 +31,16 @@ public MasterElector(Application application) {
2931
}
3032

3133
public void addMasterChangeListener(MasterChangeListener masterChangeListener) {
34+
addMasterChangeListener(new CopyOnWriteArrayList<MasterChangeListener>(Arrays.asList(masterChangeListener)));
35+
}
36+
37+
public void addMasterChangeListener(List<MasterChangeListener> masterChangeListeners) {
3238
if (masterChangeListenerList == null) {
33-
masterChangeListenerList = new ArrayList<MasterChangeListener>();
39+
masterChangeListenerList = new CopyOnWriteArrayList<MasterChangeListener>();
40+
}
41+
if (CollectionUtils.isNotEmpty(masterChangeListeners)) {
42+
masterChangeListenerList.addAll(masterChangeListeners);
3443
}
35-
masterChangeListenerList.add(masterChangeListener);
3644
}
3745

3846
public void addNodes(List<Node> nodes) {

job-core/src/main/java/com/lts/job/core/listener/SelfChangeListener.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package com.lts.job.core.listener;
22

3-
import com.lts.job.core.Application;
3+
import com.lts.job.core.cluster.Config;
44
import com.lts.job.core.cluster.Node;
55
import com.lts.job.core.cluster.NodeType;
66
import com.lts.job.core.constant.EcTopic;
7-
import com.lts.job.core.cluster.Config;
7+
import com.lts.job.core.extension.ExtensionLoader;
88
import com.lts.job.core.util.CollectionUtils;
99
import com.lts.job.ec.EventCenter;
10+
import com.lts.job.ec.EventCenterFactory;
1011
import com.lts.job.ec.EventInfo;
1112

1213
import java.util.List;
@@ -20,10 +21,11 @@ public class SelfChangeListener implements NodeChangeListener {
2021

2122
private Config config;
2223
private EventCenter eventCenter;
24+
private EventCenterFactory eventCenterFactory = ExtensionLoader.getExtensionLoader(EventCenterFactory.class).getAdaptiveExtension();
2325

24-
public SelfChangeListener(Application application) {
25-
this.config = application.getConfig();
26-
this.eventCenter = application.getEventCenter();
26+
public SelfChangeListener(Config config) {
27+
this.config = config;
28+
this.eventCenter = eventCenterFactory.getEventCenter(config);
2729
}
2830

2931

job-core/src/main/java/com/lts/job/core/logger/support/AbstractLogger.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,39 @@ public abstract class AbstractLogger implements Logger {
1111
@Override
1212
public void trace(String format, Object... arguments) {
1313
if (isTraceEnabled()) {
14-
FormattingTuple ft = MessageFormatter.format(format, arguments);
14+
FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments);
1515
trace(ft.getMessage(), ft.getThrowable());
1616
}
1717
}
1818

1919
@Override
2020
public void debug(String format, Object... arguments) {
2121
if (isDebugEnabled()) {
22-
FormattingTuple ft = MessageFormatter.format(format, arguments);
22+
FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments);
2323
debug(ft.getMessage(), ft.getThrowable());
2424
}
2525
}
2626

2727
@Override
2828
public void info(String format, Object... arguments) {
2929
if (isInfoEnabled()) {
30-
FormattingTuple ft = MessageFormatter.format(format, arguments);
30+
FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments);
3131
info(ft.getMessage(), ft.getThrowable());
3232
}
3333
}
3434

3535
@Override
3636
public void warn(String format, Object... arguments) {
3737
if (isWarnEnabled()) {
38-
FormattingTuple ft = MessageFormatter.format(format, arguments);
38+
FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments);
3939
warn(ft.getMessage(), ft.getThrowable());
4040
}
4141
}
4242

4343
@Override
4444
public void error(String format, Object... arguments) {
4545
if (isErrorEnabled()) {
46-
FormattingTuple ft = MessageFormatter.format(format, arguments);
46+
FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments);
4747
error(ft.getMessage(), ft.getThrowable());
4848
}
4949
}

job-core/src/main/java/com/lts/job/core/protocol/command/CommandBodyWrapper.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
package com.lts.job.core.protocol.command;
22

3-
import com.lts.job.core.Application;
3+
import com.lts.job.core.cluster.Config;
44

55
/**
66
* 用于设置CommandBody 的基础信息
77
* Robert HG (254963746@qq.com) on 3/13/15.
88
*/
99
public class CommandBodyWrapper {
1010

11-
private Application application;
11+
private Config config;
1212

13-
public CommandBodyWrapper(Application application) {
14-
this.application = application;
13+
public CommandBodyWrapper(Config config) {
14+
this.config = config;
1515
}
1616

1717
public <T extends AbstractCommandBody> T wrapper(T commandBody) {
18-
commandBody.setNodeGroup(application.getConfig().getNodeGroup());
19-
commandBody.setNodeType(application.getConfig().getNodeType().name());
20-
commandBody.setIdentity(application.getConfig().getIdentity());
18+
commandBody.setNodeGroup(config.getNodeGroup());
19+
commandBody.setNodeType(config.getNodeType().name());
20+
commandBody.setIdentity(config.getIdentity());
2121
return commandBody;
2222
}
2323

job-core/src/main/java/com/lts/job/core/Version.java renamed to job-core/src/main/java/com/lts/job/core/util/Version.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
package com.lts.job.core;
1+
package com.lts.job.core.util;
22

33
import com.lts.job.core.logger.Logger;
44
import com.lts.job.core.logger.LoggerFactory;
5-
import com.lts.job.core.util.ClassHelper;
65

76
import java.net.URL;
87
import java.security.CodeSource;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.lts.job.ec;
2+
3+
import com.lts.job.core.cluster.Config;
4+
import com.lts.job.core.extension.Adaptive;
5+
import com.lts.job.core.extension.SPI;
6+
7+
/**
8+
* @author Robert HG (254963746@qq.com) on 5/19/15.
9+
*/
10+
@SPI("injvm")
11+
public interface EventCenterFactory {
12+
13+
@Adaptive("event.center")
14+
EventCenter getEventCenter(Config config);
15+
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.lts.job.ec.injvm;
2+
3+
import com.lts.job.core.cluster.Config;
4+
import com.lts.job.ec.EventCenter;
5+
import com.lts.job.ec.EventCenterFactory;
6+
7+
/**
8+
* @author Robert HG (254963746@qq.com) on 5/19/15.
9+
*/
10+
public class InJvmEventCenterFactory implements EventCenterFactory {
11+
12+
@Override
13+
public EventCenter getEventCenter(Config config) {
14+
return new InjvmEventCenter();
15+
}
16+
}

job-core/src/main/java/com/lts/job/ec/JvmEventCenter.java renamed to job-core/src/main/java/com/lts/job/ec/injvm/InjvmEventCenter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
package com.lts.job.ec;
1+
package com.lts.job.ec.injvm;
22

33
import com.lts.job.core.constant.Constants;
44
import com.lts.job.core.util.ConcurrentHashSet;
55
import com.lts.job.core.util.JSONUtils;
66
import com.lts.job.core.logger.Logger;
77
import com.lts.job.core.logger.LoggerFactory;
8+
import com.lts.job.ec.EventCenter;
9+
import com.lts.job.ec.EventInfo;
10+
import com.lts.job.ec.EventSubscriber;
811

912
import java.util.Map;
1013
import java.util.Set;
@@ -16,7 +19,7 @@
1619
* 在一个jvm中的pub sub 简易实现
1720
* @author Robert HG (254963746@qq.com) on 5/12/15.
1821
*/
19-
public class JvmEventCenter implements EventCenter{
22+
public class InjvmEventCenter implements EventCenter {
2023

2124
private static final Logger LOGGER = LoggerFactory.getLogger(EventCenter.class.getName());
2225

0 commit comments

Comments
 (0)