Skip to content

Commit c54a8a0

Browse files
committed
Refactor to Kademlia buckets
1 parent ae68283 commit c54a8a0

File tree

9 files changed

+221
-45
lines changed

9 files changed

+221
-45
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
<dependency>
7171
<groupId>io.netty</groupId>
7272
<artifactId>netty-all</artifactId>
73-
<version>4.2.2.Final</version>
73+
<version>4.2.7.Final</version>
7474
</dependency>
7575
<dependency>
7676
<groupId>org.json</groupId>
@@ -80,7 +80,7 @@
8080
<dependency>
8181
<groupId>org.junit.jupiter</groupId>
8282
<artifactId>junit-jupiter-engine</artifactId>
83-
<version>5.13.3</version>
83+
<version>6.0.1</version>
8484
<scope>test</scope>
8585
</dependency>
8686
</dependencies>

src/main/java/com/lbry/globe/handler/HTTPHandler.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.lbry.globe.Main;
44
import com.lbry.globe.api.API;
55

6+
import com.lbry.globe.kademlia.KademliaBucket;
7+
import com.lbry.globe.kademlia.KademliaTriple;
68
import com.lbry.globe.util.DHT;
79
import com.lbry.globe.util.Hex;
810
import com.lbry.globe.util.TimeoutFutureManager;
@@ -99,19 +101,21 @@ private void handleResponse(ChannelHandlerContext ctx){
99101
//STORE IS NOT SUPPORTED
100102
json.put("query",queryParts);
101103

102-
Map<InetSocketAddress,Boolean> peers = DHT.getPeers();
103-
CompletableFuture<UDP.Packet>[] futures = new CompletableFuture[peers.size()];
104+
// Using just a single bucket is wrong, but easy for now.
105+
KademliaBucket SINGLE_BUCKET = DHT.KADEMLIA.getBucket(0);
106+
107+
CompletableFuture<UDP.Packet>[] futures = new CompletableFuture[SINGLE_BUCKET.size()];
104108
int i=0;
105-
for(Map.Entry<InetSocketAddress,Boolean> entry : peers.entrySet()){
109+
for(KademliaTriple triple : SINGLE_BUCKET.getList()){
106110
try{
107111
if("ping".equals(queryParts[0])){
108-
futures[i] = DHT.ping(DHT.getSocket(),entry.getKey());
112+
futures[i] = DHT.ping(DHT.getSocket(),triple.getInetSocketAddress());
109113
}
110114
if("findNode".equals(queryParts[0])){
111-
futures[i] = DHT.findNode(DHT.getSocket(),entry.getKey(),queryParts.length>=2?Hex.decode(queryParts[1]):new byte[48]);
115+
futures[i] = DHT.findNode(DHT.getSocket(),triple.getInetSocketAddress(),queryParts.length>=2?Hex.decode(queryParts[1]):new byte[48]);
112116
}
113117
if("findValue".equals(queryParts[0])){
114-
futures[i] = DHT.findValue(DHT.getSocket(),entry.getKey(),queryParts.length>=2?Hex.decode(queryParts[1]):new byte[48]);
118+
futures[i] = DHT.findValue(DHT.getSocket(),triple.getInetSocketAddress(),queryParts.length>=2?Hex.decode(queryParts[1]):new byte[48]);
115119
}
116120
}catch(IOException ignored){}
117121
i++;
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.lbry.globe.kademlia;
2+
3+
import java.util.*;
4+
5+
public class KademliaBucket{
6+
7+
private final List<KademliaTriple> list = new ArrayList<>();
8+
private final KademliaSystem system;
9+
10+
public KademliaBucket(KademliaSystem system){
11+
this.system = system;
12+
}
13+
14+
public List<KademliaTriple> getList(){
15+
return Collections.unmodifiableList(this.list);
16+
}
17+
18+
public KademliaTriple getHead(){
19+
return !this.list.isEmpty()?this.list.getFirst():null;
20+
}
21+
22+
public KademliaTriple removeFromHead(){
23+
return this.list.removeFirst();
24+
}
25+
26+
public void insertAtTail(KademliaTriple triple){
27+
if(this.size()<this.system.getK()) {
28+
this.list.addLast(triple);
29+
}
30+
}
31+
32+
public void moveToTail(KademliaTriple triple){
33+
if(this.list.remove(triple)){
34+
this.insertAtTail(triple);
35+
}
36+
}
37+
38+
public int size(){
39+
return this.list.size();
40+
}
41+
42+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.lbry.globe.kademlia;
2+
3+
public enum KademliaRPC{
4+
PING,
5+
STORE,
6+
FIND_NODE,
7+
FIND_VALUE,
8+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.lbry.globe.kademlia;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
public class KademliaSystem{
7+
8+
private final List<KademliaBucket> buckets = new ArrayList<>();
9+
10+
private int alpha;
11+
private int k;
12+
13+
private byte[] nodeID;
14+
15+
public KademliaSystem(int alpha,int k,int n){
16+
this.alpha = alpha;
17+
this.k = k;
18+
for(int i=0;i<n;i++){
19+
this.buckets.add(new KademliaBucket(this));
20+
}
21+
}
22+
23+
public int getAlpha() {
24+
return this.alpha;
25+
}
26+
27+
public int getK() {
28+
return this.k;
29+
}
30+
31+
public KademliaBucket getBucket(int i){
32+
return this.buckets.size()-1>=i?this.buckets.get(i):null;
33+
}
34+
35+
public void setK(int k){
36+
if(this.k>k){
37+
int d = this.k-k;
38+
for(KademliaBucket bucket : this.buckets){
39+
for(int i=0;i<d;i++){
40+
bucket.removeFromHead();
41+
}
42+
}
43+
return;
44+
}
45+
this.k = k;
46+
}
47+
48+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.lbry.globe.kademlia;
2+
3+
import com.lbry.globe.util.Hex;
4+
5+
import java.net.InetAddress;
6+
import java.net.InetSocketAddress;
7+
8+
public class KademliaTriple{
9+
10+
private final InetAddress ip_address;
11+
private final int udp_port;
12+
private final byte[] node_id;
13+
14+
public KademliaTriple(InetAddress ip_address,int udp_port,byte[] node_id){
15+
this.ip_address = ip_address;
16+
this.udp_port = udp_port & 0xFFFF;
17+
this.node_id = node_id;
18+
}
19+
20+
public InetAddress getIPAddress(){
21+
return this.ip_address;
22+
}
23+
24+
public int getUDPPort(){
25+
return this.udp_port;
26+
}
27+
28+
public byte[] getNodeID(){
29+
return this.node_id;
30+
}
31+
32+
public InetSocketAddress getInetSocketAddress(){
33+
return new InetSocketAddress(this.ip_address,this.udp_port & 0xFFFF);
34+
}
35+
36+
@Override
37+
public String toString() {
38+
return "KademliaTriple{" +
39+
"ip_address=" + this.ip_address +
40+
", udp_port=" + this.udp_port +
41+
", node_id=" + Hex.encode(this.node_id) +
42+
'}';
43+
}
44+
45+
}

src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.lbry.globe.thread;
22

33
import com.lbry.globe.api.API;
4+
import com.lbry.globe.kademlia.KademliaBucket;
5+
import com.lbry.globe.kademlia.KademliaTriple;
46
import com.lbry.globe.object.Node;
57
import com.lbry.globe.object.Service;
68
import com.lbry.globe.util.DHT;
@@ -18,6 +20,9 @@
1820

1921
public class DHTNodeFinderThread implements Runnable{
2022

23+
// Using just a single bucket is wrong, but easy for now.
24+
private static final KademliaBucket SINGLE_BUCKET = DHT.KADEMLIA.getBucket(0);
25+
2126
public static final String[] BOOTSTRAP = {
2227
"dht.lbry.grin.io:4444", // Grin
2328
"dht.lbry.madiator.com:4444", // Madiator
@@ -27,14 +32,22 @@ public class DHTNodeFinderThread implements Runnable{
2732
"lbrynet3.lbry.com:4444", // EU
2833
"lbrynet4.lbry.com:4444", // ASIA
2934
"dht.lizard.technology:4444", // Jack
30-
"s2.lbry.network:4444",
35+
"s2.lbry.network:4444", // LBRY Foundation
3136
};
3237

3338
@Override
3439
public void run(){
3540
for(String bootstrap : DHTNodeFinderThread.BOOTSTRAP){
3641
URI uri = URI.create("udp://"+bootstrap);
37-
DHT.getPeers().put(new InetSocketAddress(uri.getHost(),uri.getPort()),true);
42+
try{
43+
DHT.ping(DHT.getSocket(),new InetSocketAddress(uri.getHost(),uri.getPort())).thenAccept((UDP.Packet packet) -> {
44+
byte[] receivingBytes = packet.getData();
45+
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
46+
SINGLE_BUCKET.insertAtTail(new KademliaTriple(packet.getAddress().getAddress(),packet.getAddress().getPort(),message.getNodeID()));
47+
}).exceptionally((Throwable e) -> null);
48+
}catch(Exception e){
49+
System.out.println("Failed bootstrap ping");
50+
}
3851
}
3952

4053
this.startSender();
@@ -45,12 +58,10 @@ private void startSender(){
4558
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DHT Sender")).scheduleWithFixedDelay(() -> {
4659
System.out.println("[DHT] BULK PING");
4760
API.saveNodes();
48-
for(InetSocketAddress socketAddress : DHT.getPeers().keySet()){
49-
String hostname = socketAddress.getHostName();
50-
int port = socketAddress.getPort();
61+
for(KademliaTriple triple : SINGLE_BUCKET.getList()){
5162
try{
52-
for(InetAddress ip : InetAddress.getAllByName(hostname)){
53-
InetSocketAddress destination = new InetSocketAddress(ip,port);
63+
for(InetAddress ip : InetAddress.getAllByName(triple.getIPAddress().getHostName())){
64+
InetSocketAddress destination = new InetSocketAddress(ip,triple.getUDPPort());
5465
this.doPing(destination);
5566
}
5667
}catch(Exception e){
@@ -107,14 +118,18 @@ private void doFindNode(InetSocketAddress destination) throws IOException{
107118
for(List<Object> n : nodes){
108119
String hostname = (String) n.get(1);
109120
int port = (int) ((long) n.get(2));
110-
InetSocketAddress existingSocketAddr = null;
111-
for(InetSocketAddress addr : DHT.getPeers().keySet()){
112-
if(addr.getHostName().equals(hostname) && addr.getPort()==port){
113-
existingSocketAddr = addr;
121+
KademliaTriple existingTriple = null;
122+
for(KademliaTriple triple : SINGLE_BUCKET.getList()){
123+
if(triple.getIPAddress().getHostName().equals(hostname) && triple.getUDPPort()==port){
124+
existingTriple = triple;
114125
}
115126
}
116-
if(existingSocketAddr==null){
117-
DHT.getPeers().put(new InetSocketAddress(hostname,port),false);
127+
if(existingTriple==null){
128+
try{
129+
SINGLE_BUCKET.insertAtTail(new KademliaTriple(InetAddress.getByName(hostname),port,message.getNodeID()));
130+
}catch(Exception e){
131+
e.printStackTrace();
132+
}
118133
}
119134
}
120135
}).exceptionally((Throwable e) -> null);

src/main/java/com/lbry/globe/util/BencodeConverter.java

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,47 @@
44
import com.dampcake.bencode.Type;
55

66
import java.nio.ByteBuffer;
7+
import java.nio.charset.StandardCharsets;
78
import java.util.List;
89
import java.util.Map;
910

1011
public class BencodeConverter{
1112

12-
private static final Bencode BENCODE = new Bencode(true);
13+
private static final Bencode BENCODE = new Bencode(StandardCharsets.US_ASCII,true);
1314

1415
public static byte[] encode(Map<String,?> map){
1516
return BencodeConverter.BENCODE.encode(map);
1617
}
1718

1819
public static Map<String,?> decode(byte[] bytes){
20+
return BencodeConverter.decode(bytes,ToStringConversion.ASCII);
21+
}
22+
23+
public static Map<String,?> decode(byte[] bytes,ToStringConversion stringConversion){
1924
// Fix invalid B-encoding
2025
if(bytes[0]=='d'){
2126
bytes[0] = 'l';
22-
}
23-
List<Object> list = BencodeConverter.BENCODE.decode(bytes,Type.LIST);
24-
for(int i=0;i<list.size();i++){
25-
if(i%2==0){
26-
list.set(i,BencodeConverter.walkAndConvertByteBufferToByteArrayOrString((list.get(i))));
27+
28+
List<Object> list = BencodeConverter.BENCODE.decode(bytes,Type.LIST);
29+
for(int i=0;i<list.size();i++){
30+
if(i%2==0){
31+
Object key = list.get(i);
32+
if(key instanceof Number){
33+
key = String.valueOf(key);
34+
}
35+
list.set(i,BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(key,stringConversion));
36+
}
2737
}
28-
}
29-
bytes = BencodeConverter.BENCODE.encode(list);
30-
if(bytes[0]=='l'){
38+
bytes = BencodeConverter.BENCODE.encode(list);
39+
3140
bytes[0] = 'd';
3241
}
3342

3443
// Normal B-decoding
35-
return BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(BencodeConverter.BENCODE.decode(bytes,Type.DICTIONARY));
44+
return BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(BencodeConverter.BENCODE.decode(bytes,Type.DICTIONARY),stringConversion);
3645
}
3746

38-
private static <V> V walkAndConvertByteBufferToByteArrayOrString(Object value){
47+
private static <V> V walkAndConvertByteBufferToByteArrayOrString(Object value,ToStringConversion stringConversion){
3948
if(value instanceof ByteBuffer){
4049
ByteBuffer bb = (ByteBuffer) value;
4150
byte[] ba = bb.array();
@@ -47,20 +56,27 @@ private static <V> V walkAndConvertByteBufferToByteArrayOrString(Object value){
4756
break;
4857
}
4958
}
50-
if(hasControlOrNonASCII){
51-
return (V) ba;
59+
if(!ToStringConversion.NONE.equals(stringConversion) && (!hasControlOrNonASCII || ToStringConversion.ALL.equals(stringConversion))){
60+
return (V) new String(ba);
5261
}
53-
return (V) new String(ba);
62+
return (V) ba;
5463
}
5564
if(value instanceof List){
56-
List<Object> l = (List<Object>) value;
57-
l.replaceAll(BencodeConverter::walkAndConvertByteBufferToByteArrayOrString);
65+
List<?> list = (List<?>) value;
66+
list.replaceAll((v) -> BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(v,stringConversion));
5867
}
5968
if(value instanceof Map){
60-
Map<Object,Object> m = (Map<Object,Object>) value;
61-
m.replaceAll((k,v) -> BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(v));
69+
Map<?,?> map = (Map<?,?>) value;
70+
map.replaceAll((k,v) -> BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(v,stringConversion));
6271
}
72+
6373
return (V) value;
6474
}
6575

76+
public enum ToStringConversion{
77+
NONE,
78+
ASCII,
79+
ALL,
80+
}
81+
6682
}

0 commit comments

Comments
 (0)