Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,15 @@ public class CommonParameter {
@Getter
public int rateLimiterGlobalApiQps;
@Getter
@Setter
public double rateLimiterSyncBlockChain;
@Getter
@Setter
public double rateLimiterFetchInvData;
@Getter
@Setter
public double rateLimiterDisconnect;
@Getter
public DbBackupConfig dbBackupConfig;
@Getter
public RocksDbSettings rocksDBCustomSettings;
Expand Down
3 changes: 3 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ public class Constant {

public static final String RATE_LIMITER_HTTP = "rate.limiter.http";
public static final String RATE_LIMITER_RPC = "rate.limiter.rpc";
public static final String RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN = "rate.limiter.p2p.syncBlockChain";
public static final String RATE_LIMITER_P2P_FETCH_INV_DATA = "rate.limiter.p2p.fetchInvData";
public static final String RATE_LIMITER_P2P_DISCONNECT = "rate.limiter.p2p.disconnect";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As user can set the value, so add this configuration to config file? such as config.conf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a repeated question; please refer to the previous response for details.


public static final String SEED_NODE_IP_LIST = "seed.node.ip.list";
public static final String NODE_METRICS_ENABLE = "node.metricsEnable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public enum TypeEnum {
PROTOBUF_ERROR(14, "protobuf inconsistent"),
BLOCK_SIGN_ERROR(15, "block sign error"),
BLOCK_MERKLE_ERROR(16, "block merkle error"),
RATE_LIMIT_EXCEEDED(17, "rate limit exceeded"),

DEFAULT(100, "default exception");

Expand Down
15 changes: 15 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ public static void clearParam() {
PARAMETER.rateLimiterGlobalQps = 50000;
PARAMETER.rateLimiterGlobalIpQps = 10000;
PARAMETER.rateLimiterGlobalApiQps = 1000;
PARAMETER.rateLimiterSyncBlockChain = 3.0;
PARAMETER.rateLimiterFetchInvData = 3.0;
PARAMETER.rateLimiterDisconnect = 1.0;
PARAMETER.p2pDisable = false;
PARAMETER.dynamicConfigEnable = false;
PARAMETER.dynamicConfigCheckInterval = 600;
Expand Down Expand Up @@ -1041,6 +1044,18 @@ public static void setParam(final Config config) {

PARAMETER.rateLimiterInitialization = getRateLimiterFromConfig(config);

PARAMETER.rateLimiterSyncBlockChain =
config.hasPath(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) ? config
.getDouble(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) : 3.0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment specifying the rate limit unit: is this per second, per minute, per hour, or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no special instructions are given, the default is qps.


PARAMETER.rateLimiterFetchInvData =
config.hasPath(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) ? config
.getDouble(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) : 3.0;

PARAMETER.rateLimiterDisconnect =
config.hasPath(Constant.RATE_LIMITER_P2P_DISCONNECT) ? config
.getDouble(Constant.RATE_LIMITER_P2P_DISCONNECT) : 1.0;

PARAMETER.changedDelegation =
config.hasPath(Constant.COMMITTEE_CHANGED_DELEGATION) ? config
.getInt(Constant.COMMITTEE_CHANGED_DELEGATION) : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,11 @@ private void processMessage(PeerConnection peer, byte[] data) {
handshakeService.processHelloMessage(peer, (HelloMessage) msg);
break;
case P2P_DISCONNECT:
peer.getChannel().close();
peer.getNodeStatistics()
.nodeDisconnectedRemote(((DisconnectMessage)msg).getReason());
if (peer.getP2pRateLimiter().tryAcquire(type.asByte())) {
Copy link

@Sunny6889 Sunny6889 Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why disconnect need a rate limiter? As in your issue mentioned "After receiving the message, the connection will be disconnected and no response will be given."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question: is this rate limit set per Peer or shared by all Peers? Will this rate limiter effect the normal disconnect logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question: is this rate limit set per Peer or shared by all Peers? Will this rate limiter effect the normal disconnect logic?

This rate limit is set per peer. This rate limiter does not affect the normal disconnection logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why disconnect need a rate limiter? As in your issue mentioned "After receiving the message, the connection will be disconnected and no response will be given."

To prevent the peer from sending a large number of repeated disconnect messages at once, it will not respond to the message, but will execute the disconnection logic.

peer.getChannel().close();
peer.getNodeStatistics()
.nodeDisconnectedRemote(((DisconnectMessage)msg).getReason());
}
break;
case SYNC_BLOCK_CHAIN:
syncBlockChainMsgHandler.processMessage(peer, msg);
Expand Down Expand Up @@ -259,6 +261,7 @@ private void processException(PeerConnection peer, TronMessage msg, Exception ex
code = Protocol.ReasonCode.NO_SUCH_MESSAGE;
break;
case BAD_MESSAGE:
case RATE_LIMIT_EXCEEDED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mapping doesn't seem quite right. RATE_LIMIT_EXCEEDED is a server capacity issue rather than a protocol violation. Should we use a more specific error type or create a dedicated rate limit handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to normal protocol logic, RATE_LIMIT_EXCEEDED will not appear. Once it appears, it is considered a protocol violation.

code = Protocol.ReasonCode.BAD_PROTOCOL;
break;
case SYNC_FAILED:
Expand Down
32 changes: 32 additions & 0 deletions framework/src/main/java/org/tron/core/net/P2pRateLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.tron.core.net;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;

public class P2pRateLimiter {
private final Cache<Byte, RateLimiter> rateLimiters = CacheBuilder.newBuilder()
.maximumSize(32).build();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is a hidden size 32 limit, can make it a large one so it normally won't never hit the limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message types defined by the current protocol are very limited, and the limit of 32 is sufficient for the foreseeable future.


public void register(Byte type, double rate) {
RateLimiter rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about create the RateLimter with rate, such as:
RateLimiter rateLimiter = RateLimiter.create(rate);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach you mentioned has a problem: the initial capacity of the token bucket is only 1. Once the peer completes the handshake, it immediately enters the synchronization state. If the initial token count is 1, it may lead to insufficient tokens.

rateLimiter.setRate(rate);
rateLimiters.put(type, rateLimiter);
}

public void acquire(Byte type) {
RateLimiter rateLimiter = rateLimiters.getIfPresent(type);
if (rateLimiter == null) {
return;
}
rateLimiter.acquire();
}

public boolean tryAcquire(Byte type) {
RateLimiter rateLimiter = rateLimiters.getIfPresent(type);
if (rateLimiter == null) {
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any risk if return true for the unregistered type(rateLimter == null)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unregistered types return true, meaning no speed limit.

}
return rateLimiter.tryAcquire();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr
if (!peer.isNeedSyncFromUs()) {
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
}
if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumerInvToFetch task is scheduled every 30ms, but the limit FETCH_INV_DATA_RATE is 3 times/s, there will be conflict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rate limit is based on the premise of block synchronization.

throw new P2pException(TypeEnum.RATE_LIMIT_EXCEEDED, fetchInvDataMsg.getType()
+ " message exceeds the rate limit");
}
if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) {
throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too many blocks, size:"
+ fetchInvDataMsg.getHashList().size());
}
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
long blockNum = new BlockId(hash).getNum();
long minBlockNum =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
}

private boolean check(PeerConnection peer, SyncBlockChainMessage msg) throws P2pException {
if (peer.getRemainNum() > 0
&& !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) {
// Discard messages that exceed the rate limit
logger.warn("{} message from peer {} exceeds the rate limit",
msg.getType(), peer.getInetSocketAddress());
return false;
}

List<BlockId> blockIds = msg.getBlockIds();
if (CollectionUtils.isEmpty(blockIds)) {
throw new P2pException(TypeEnum.BAD_MESSAGE, "SyncBlockChain blockIds is empty");
Expand Down
13 changes: 13 additions & 0 deletions framework/src/main/java/org/tron/core/net/peer/PeerConnection.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.tron.core.net.peer;

import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;
import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT;
import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -32,6 +36,7 @@
import org.tron.core.config.args.Args;
import org.tron.core.metrics.MetricsKey;
import org.tron.core.metrics.MetricsUtil;
import org.tron.core.net.P2pRateLimiter;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.InventoryMessage;
import org.tron.core.net.message.adv.TransactionsMessage;
Expand Down Expand Up @@ -156,6 +161,8 @@ public class PeerConnection {
@Setter
@Getter
private volatile boolean needSyncFromUs = true;
@Getter
private P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();

public void setChannel(Channel channel) {
this.channel = channel;
Expand All @@ -164,6 +171,12 @@ public void setChannel(Channel channel) {
}
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
lastInteractiveTime = System.currentTimeMillis();
p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(),
Args.getInstance().getRateLimiterSyncBlockChain());
p2pRateLimiter.register(FETCH_INV_DATA.asByte(),
Args.getInstance().getRateLimiterFetchInvData());
p2pRateLimiter.register(P2P_DISCONNECT.asByte(),
Args.getInstance().getRateLimiterDisconnect());
}

public void setBlockBothHave(BlockId blockId) {
Expand Down
23 changes: 23 additions & 0 deletions framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.tron.core.net;

import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;
import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN;

import org.junit.Assert;
import org.junit.Test;

public class P2pRateLimiterTest {
@Test
public void test() {
P2pRateLimiter limiter = new P2pRateLimiter();
limiter.register(SYNC_BLOCK_CHAIN.asByte(), 2);
limiter.acquire(SYNC_BLOCK_CHAIN.asByte());
boolean ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
Assert.assertTrue(ret);
limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
Assert.assertFalse(ret);
ret = limiter.tryAcquire(FETCH_INV_DATA.asByte());
Assert.assertTrue(ret);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.tron.core.net.messagehandler;

import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.lang.reflect.Field;
Expand All @@ -13,6 +15,7 @@
import org.tron.common.utils.Sha256Hash;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.config.Parameter;
import org.tron.core.net.P2pRateLimiter;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.BlockMessage;
import org.tron.core.net.message.adv.FetchInvDataMessage;
Expand Down Expand Up @@ -55,6 +58,9 @@ public void testProcessMessage() throws Exception {
Mockito.when(advService.getMessage(new Item(blockId, Protocol.Inventory.InventoryType.BLOCK)))
.thenReturn(new BlockMessage(blockCapsule));
ReflectUtils.setFieldValue(fetchInvDataMsgHandler, "advService", advService);
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2);
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);

fetchInvDataMsgHandler.processMessage(peer,
new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK));
Expand All @@ -74,6 +80,9 @@ public void testSyncFetchCheck() {
Cache<Item, Long> advInvSpread = CacheBuilder.newBuilder().maximumSize(100)
.expireAfterWrite(1, TimeUnit.HOURS).recordStats().build();
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2);
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);

FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler();

Expand All @@ -93,4 +102,36 @@ public void testSyncFetchCheck() {
Assert.assertEquals(e.getMessage(), "minBlockNum: 16000, blockNum: 10000");
}
}

@Test
public void testRateLimiter() {
BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L);
List<Sha256Hash> blockIds = new LinkedList<>();
for (int i = 0; i <= 100; i++) {
blockIds.add(blockId);
}
FetchInvDataMessage msg =
new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK);
PeerConnection peer = Mockito.mock(PeerConnection.class);
Mockito.when(peer.isNeedSyncFromUs()).thenReturn(true);
Cache<Item, Long> advInvSpread = CacheBuilder.newBuilder().maximumSize(100)
.expireAfterWrite(1, TimeUnit.HOURS).recordStats().build();
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1);
p2pRateLimiter.acquire(FETCH_INV_DATA.asByte());
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);
FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler();

try {
fetchInvDataMsgHandler.processMessage(peer, msg);
} catch (Exception e) {
Assert.assertEquals("fetch too many blocks, size:101", e.getMessage());
}
try {
fetchInvDataMsgHandler.processMessage(peer, msg);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().endsWith("rate limit"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void init() throws Exception {
@Test
public void testProcessMessage() throws Exception {
try {
peer.setRemainNum(1);
handler.processMessage(peer, new SyncBlockChainMessage(new ArrayList<>()));
} catch (P2pException e) {
Assert.assertEquals("SyncBlockChain blockIds is empty", e.getMessage());
Expand All @@ -71,6 +72,10 @@ public void testProcessMessage() throws Exception {
Assert.assertNotNull(message.toString());
Assert.assertNotNull(((BlockInventoryMessage) message).getAnswerMessage());
Assert.assertFalse(f);
method.invoke(handler, peer, message);
method.invoke(handler, peer, message);
f = (boolean)method.invoke(handler, peer, message);
Assert.assertFalse(f);

Method method1 = handler.getClass().getDeclaredMethod(
"getLostBlockIds", List.class, BlockId.class);
Expand Down