Skip to content

Commit db65cd2

Browse files
committed
Improve detecting blockchains
1 parent 25e4f24 commit db65cd2

File tree

6 files changed

+67
-43
lines changed

6 files changed

+67
-43
lines changed

src/main/java/com/lbry/globe/Main.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.lbry.globe.thread.DHTNodeFinderThread;
88
import com.lbry.globe.thread.HubNodeFinderThread;
99
import com.lbry.globe.util.GeoIP;
10+
import com.lbry.globe.util.NamedThreadFactory;
1011

1112
import io.netty.bootstrap.ServerBootstrap;
1213
import io.netty.channel.ChannelInitializer;
@@ -17,7 +18,6 @@
1718
import io.netty.channel.socket.nio.NioServerSocketChannel;
1819
import io.netty.handler.codec.http.HttpRequestDecoder;
1920
import io.netty.handler.codec.http.HttpResponseEncoder;
20-
import io.netty.util.concurrent.DefaultThreadFactory;
2121

2222
import java.util.Arrays;
2323
import java.util.logging.Logger;
@@ -36,7 +36,7 @@ public Main(String... args){
3636

3737
@Override
3838
public void run(){
39-
EventLoopGroup group = new MultiThreadIoEventLoopGroup(new DefaultThreadFactory("Netty Event Loop"),NioIoHandler.newFactory());
39+
EventLoopGroup group = new MultiThreadIoEventLoopGroup(new NamedThreadFactory("Netty Event Loop"),NioIoHandler.newFactory());
4040
this.runTCPServerHTTP(group);
4141
}
4242

@@ -66,7 +66,7 @@ public static void main(String... args){
6666
Runtime.getRuntime().addShutdownHook(new Thread(GeoIP::saveCache,"Save Cache"));
6767
GeoIP.loadCache();
6868
Main.LOGGER.info("Starting finder thread for blockchain nodes");
69-
new Thread(new BlockchainNodeFinderThread(),"Block Node Finder").start();
69+
new BlockchainNodeFinderThread().run();
7070
Main.LOGGER.info("Starting finder thread for DHT nodes");
7171
new DHTNodeFinderThread().run();
7272
Main.LOGGER.info("Starting finder thread for hub nodes");

src/main/java/com/lbry/globe/thread/BlockchainNodeFinderThread.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import java.net.InetAddress;
1414
import java.net.URI;
1515
import java.util.*;
16+
import java.util.concurrent.Executors;
17+
import java.util.concurrent.TimeUnit;
1618

19+
import com.lbry.globe.util.NamedThreadFactory;
1720
import org.json.JSONArray;
1821
import org.json.JSONObject;
1922

@@ -23,7 +26,8 @@ public class BlockchainNodeFinderThread implements Runnable{
2326
public void run(){
2427
String rpcURL = Environment.getVariable("BLOCKCHAIN_RPC_URL");
2528
if(rpcURL!=null){
26-
while(true){
29+
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Blockchain Detector")).scheduleWithFixedDelay(() -> {
30+
System.out.println("[BLOCKCHAIN] BULK PING");
2731
try{
2832
HttpURLConnection conn = (HttpURLConnection) new URI(rpcURL).toURL().openConnection();
2933
conn.setDoOutput(true);
@@ -45,12 +49,8 @@ public void run(){
4549
}catch(Exception e){
4650
e.printStackTrace();
4751
}
48-
try {
49-
Thread.sleep(10_000);
50-
} catch (InterruptedException e) {
51-
throw new RuntimeException(e);
52-
}
53-
}
52+
},0,10,TimeUnit.SECONDS);
53+
5454
}
5555
}
5656

src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.lbry.globe.object.Service;
66
import com.lbry.globe.util.DHT;
77
import com.lbry.globe.util.GeoIP;
8+
import com.lbry.globe.util.NamedThreadFactory;
89
import com.lbry.globe.util.UDP;
910

1011
import java.io.IOException;
@@ -14,7 +15,6 @@
1415
import java.util.concurrent.Executors;
1516
import java.util.concurrent.TimeUnit;
1617

17-
import io.netty.util.concurrent.DefaultThreadFactory;
1818
import org.json.JSONObject;
1919

2020
public class DHTNodeFinderThread implements Runnable{
@@ -46,7 +46,7 @@ public void run(){
4646
}
4747

4848
private void startSender(){
49-
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("DHT Sender")).scheduleWithFixedDelay(() -> {
49+
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DHT Sender")).scheduleWithFixedDelay(() -> {
5050
System.out.println("[DHT] BULK PING");
5151
API.saveNodes();
5252
for(InetSocketAddress socketAddress : DHT.getPeers().keySet()){
@@ -126,7 +126,7 @@ private void doFindNode(InetSocketAddress destination) throws IOException{
126126

127127
private void startReceiver(){
128128
new Thread(() -> {
129-
while(true) {
129+
while(DHT.getSocket().isBound()) {
130130
try {
131131
UDP.Packet receiverPacket = UDP.receive(DHT.getSocket());
132132
DHTNodeFinderThread.this.incoming.add(receiverPacket);

src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,19 @@
44
import com.lbry.globe.object.Node;
55
import com.lbry.globe.object.Service;
66
import com.lbry.globe.util.GeoIP;
7+
import com.lbry.globe.util.NamedThreadFactory;
78

89
import java.io.InputStream;
910
import java.net.InetAddress;
1011
import java.net.InetSocketAddress;
1112
import java.net.Socket;
1213
import java.util.*;
13-
import java.util.concurrent.CompletableFuture;
1414
import java.util.concurrent.Executors;
1515
import java.util.concurrent.TimeUnit;
1616

17-
import io.netty.util.concurrent.DefaultThreadFactory;
18-
import io.netty.util.concurrent.ThreadPerTaskExecutor;
1917
import org.json.JSONArray;
2018
import org.json.JSONObject;
2119

22-
import javax.net.SocketFactory;
23-
2420
public class HubNodeFinderThread implements Runnable{
2521

2622
public static final String[] HUBS = {
@@ -46,11 +42,17 @@ public class HubNodeFinderThread implements Runnable{
4642

4743
@Override
4844
public void run(){
49-
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("Hub Sender")).scheduleWithFixedDelay(() -> {
45+
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Hub Sender")).scheduleWithFixedDelay(() -> {
5046
System.out.println("[HUB] BULK PING");
5147
for(String hostname : HubNodeFinderThread.HUBS){
48+
InetAddress[] ips = null;
5249
try{
53-
for(InetAddress ip : InetAddress.getAllByName(hostname)){
50+
ips = InetAddress.getAllByName(hostname);
51+
}catch(Exception e){
52+
e.printStackTrace();
53+
}
54+
if(ips!=null){
55+
for(InetAddress ip : ips){
5456
if(!HubNodeFinderThread.SOCKETS.containsKey(ip)){
5557
HubNodeFinderThread.SOCKETS.put(ip,new Socket());
5658
}
@@ -69,35 +71,41 @@ public void run(){
6971
}
7072
System.out.println(" - [Hub] To: "+s.getRemoteSocketAddress());
7173

72-
JSONObject obj = new JSONObject();
73-
obj.put("id",new Random().nextInt());
74-
obj.put("method","server.banner");
75-
obj.put("params",new JSONArray());
76-
s.getOutputStream().write((obj+"\n").getBytes());
77-
s.getOutputStream().flush();
74+
try{
75+
JSONObject obj = new JSONObject();
76+
obj.put("id",new Random().nextInt());
77+
obj.put("method","server.banner");
78+
obj.put("params",new JSONArray());
79+
s.getOutputStream().write((obj+"\n").getBytes());
80+
s.getOutputStream().flush();
81+
}catch(Exception e){
82+
e.printStackTrace();
83+
}
7884
}
79-
for(InetAddress ip : InetAddress.getAllByName(hostname)){
85+
for(InetAddress ip : ips){
8086
Socket s = HubNodeFinderThread.SOCKETS.get(ip);
8187
if(s==null || !s.isConnected() || s.isClosed()){
8288
continue;
8389
}
8490
System.out.println(" - [Hub] From: "+s.getRemoteSocketAddress());
8591

86-
InputStream in = s.getInputStream();
87-
StringBuilder sb = new StringBuilder();
88-
int b;
89-
while((b = in.read())!='\n'){
90-
sb.append(new String(new byte[]{(byte) (b & 0xFF)}));
91-
}
92-
in.close();
93-
JSONObject respObj = new JSONObject(sb.toString());
94-
boolean successful = respObj.has("result") && !respObj.isNull("result");
95-
if(successful){
96-
LAST_SEEN.put(ip,System.currentTimeMillis());
92+
try{
93+
InputStream in = s.getInputStream();
94+
StringBuilder sb = new StringBuilder();
95+
int b;
96+
while((b = in.read())!='\n'){
97+
sb.append(new String(new byte[]{(byte) (b & 0xFF)}));
98+
}
99+
in.close();
100+
JSONObject respObj = new JSONObject(sb.toString());
101+
boolean successful = respObj.has("result") && !respObj.isNull("result");
102+
if(successful){
103+
LAST_SEEN.put(ip,System.currentTimeMillis());
104+
}
105+
}catch(Exception e){
106+
e.printStackTrace();
97107
}
98108
}
99-
}catch(Exception e){
100-
e.printStackTrace();
101109
}
102110
}
103111

src/main/java/com/lbry/globe/util/DHT.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.lbry.globe.util;
22

3-
import io.netty.util.concurrent.DefaultThreadFactory;
4-
53
import java.io.IOException;
64
import java.net.DatagramSocket;
75
import java.net.InetSocketAddress;
@@ -14,7 +12,7 @@ public class DHT{
1412

1513
public static byte[] NODE_ID = new byte[48];
1614

17-
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("Timeout Future"));
15+
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Timeout Future"));
1816
private static final TimeoutFutureManager<RPCID,UDP.Packet> futureManager = new TimeoutFutureManager<>(executor);
1917
private static final Map<InetSocketAddress,Boolean> peers = new ConcurrentHashMap<>();
2018
private static final DatagramSocket socket;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.lbry.globe.util;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
5+
public class NamedThreadFactory implements ThreadFactory{
6+
7+
private final String name;
8+
9+
public NamedThreadFactory(String name){
10+
this.name = name;
11+
}
12+
13+
@Override
14+
public Thread newThread(Runnable r){
15+
return new Thread(r,this.name);
16+
}
17+
18+
}

0 commit comments

Comments
 (0)