Skip to content

Commit 386cff7

Browse files
committed
fix bug and optimize capability
Signed-off-by: mitkey <mitkey@foxmail.com>
1 parent bfe57c1 commit 386cff7

File tree

1 file changed

+58
-54
lines changed

1 file changed

+58
-54
lines changed

src/main/java/io/socket/nativeclient/SocketClient.java

Lines changed: 58 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55
import java.io.IOException;
66
import java.net.InetSocketAddress;
77
import java.net.Socket;
8-
import java.util.Timer;
9-
import java.util.TimerTask;
10-
import java.util.concurrent.ExecutorService;
118
import java.util.concurrent.Executors;
9+
import java.util.concurrent.ScheduledExecutorService;
10+
import java.util.concurrent.TimeUnit;
1211

1312
import io.socket.nativeclient.IO.Options;
1413

@@ -20,8 +19,7 @@
2019
*/
2120
public class SocketClient {
2221

23-
private ExecutorService executorService = Executors.newCachedThreadPool();
24-
private Timer timer = new Timer();
22+
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
2523

2624
/** ReplyPollThread 是否堵塞 */
2725
private boolean blocked;
@@ -73,7 +71,7 @@ public boolean isConnected() {
7371
}
7472

7573
public void sendData(final byte[] data) {
76-
executorService.submit(new Runnable() {
74+
scheduledExecutorService.submit(new Runnable() {
7775
@Override
7876
public void run() {
7977
try {
@@ -97,74 +95,81 @@ private synchronized void connectTransport(InetSocketAddress socketAddress, Opti
9795
// 在建立连接或者发生错误之前,连接一直处于阻塞状态。
9896
socket.connect(socketAddress, opts.connectTimeout);
9997

100-
// 通知回调
98+
// 通知连接成功回调
10199
onCall.onConnect();
102100

101+
// 角标更新为 true
102+
connected = true;
103+
104+
// 开启任务线程处理接收服务器消息
105+
openTaskReceiveData();
106+
103107
// 开启定时任务用于检测 socket 是否已断开
104-
timer.schedule(new TimerTask() {
108+
openTaskCheckStatus();
109+
}
110+
111+
private void openTaskCheckStatus() {
112+
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
105113
@Override
106114
public void run() {
107115
try {
108116
socket.sendUrgentData(0xff);
109-
if (!connected) {
110-
// 开启服务器消息回复线程处理
111-
executorService.execute(new Runnable() {
112-
private BufferedInputStream bis;
113-
114-
@Override
115-
public void run() {
116-
while (isConnected()) {
117-
try {
118-
if (bis == null) {
119-
bis = new BufferedInputStream(socket.getInputStream());
120-
}
121-
122-
blocked = true;
123-
124-
// ”一次性“从输入流中读完
125-
int available = bis.available();
126-
if (available != 0) {
127-
byte[] buffer = new byte[available];
128-
int bytesRead = bis.read(buffer);
129-
// 不能使用 != -1 来判断是否读到完,因为 socket 的输入流只有 socket 断开时才会返回 -1
130-
if (bytesRead > 0) {
131-
transportData(buffer);
132-
}
133-
}
134-
135-
blocked = false;
136-
} catch (IOException e) {
137-
transportError(e);
138-
return;
139-
}
140-
}
141-
transportDisconnected();
142-
}
143-
});
144-
}
145-
connected = true;
146117
} catch (IOException e) {
147-
cancel();
148118
transportDisconnected();
149119
}
150120
}
151-
}, 0, 1000);
121+
}, 0, 50, TimeUnit.SECONDS);
122+
}
123+
124+
private void openTaskReceiveData() {
125+
if (!isConnected()) {
126+
return;
127+
}
128+
129+
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
130+
private BufferedInputStream bis;
131+
132+
@Override
133+
public void run() {
134+
if (isConnected()) {
135+
try {
136+
if (bis == null) {
137+
bis = new BufferedInputStream(socket.getInputStream());
138+
}
139+
140+
blocked = true;
141+
142+
// ”一次性“从输入流中读完
143+
int available = bis.available();
144+
if (available != 0) {
145+
byte[] buffer = new byte[available];
146+
int bytesRead = bis.read(buffer);
147+
// 不能使用 != -1 来判断是否读到完,因为 socket 的输入流只有 socket 断开时才会返回 -1
148+
if (bytesRead > 0) {
149+
transportData(buffer);
150+
}
151+
}
152+
153+
blocked = false;
154+
} catch (IOException e) {
155+
transportError(e);
156+
}
157+
}
158+
}
159+
}, 0, 10, TimeUnit.MILLISECONDS);
152160
}
153161

154162
private void transportData(byte[] respData) {
155163
onCall.onMessage(respData);
156164
}
157165

158166
private void transportDisconnected() {
159-
// 关闭任务执行器
160-
executorService.shutdownNow();
161-
162-
// 取消计时器
163-
timer.cancel();
164-
165167
// 角标更新为 false
166168
connected = false;
167169

170+
// 关闭任务执行器
171+
scheduledExecutorService.shutdownNow();
172+
168173
// 通知回调
169174
onCall.onDisconnect();
170175

@@ -181,7 +186,6 @@ private void transportDisconnected() {
181186

182187
private void transportError(Exception error) {
183188
onCall.onError(new SocketIOException(error));
184-
transportDisconnected();
185189
}
186190

187191
}

0 commit comments

Comments
 (0)