From aa682c92c3832ebd4f21af333b9f742c1c785972 Mon Sep 17 00:00:00 2001 From: chengtx01 Date: Tue, 6 May 2025 18:01:57 +0800 Subject: [PATCH 1/8] feat(net):P2P message rate limit --- .../tron/core/net/P2pEventHandlerImpl.java | 8 +++-- .../org/tron/core/net/P2pRateLimiter.java | 30 +++++++++++++++++++ .../FetchInvDataMsgHandler.java | 11 +++++++ .../SyncBlockChainMsgHandler.java | 7 +++++ .../tron/core/net/peer/PeerConnection.java | 11 +++++++ 5 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 framework/src/main/java/org/tron/core/net/P2pRateLimiter.java diff --git a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java index 795c90b4edd..90c3739a3ec 100644 --- a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java +++ b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java @@ -178,9 +178,11 @@ private void processMessage(PeerConnection peer, byte[] data) { handshakeService.processHelloMessage(peer, (HelloMessage) msg); break; case P2P_DISCONNECT: - peer.getChannel().close(); - peer.getNodeStatistics() - .nodeDisconnectedRemote(((DisconnectMessage)msg).getReason()); + if (peer.getP2pRateLimiter().tryAcquire(type.asByte())) { + peer.getChannel().close(); + peer.getNodeStatistics() + .nodeDisconnectedRemote(((DisconnectMessage)msg).getReason()); + } break; case SYNC_BLOCK_CHAIN: syncBlockChainMsgHandler.processMessage(peer, msg); diff --git a/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java b/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java new file mode 100644 index 00000000000..aa1c7bbeffc --- /dev/null +++ b/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java @@ -0,0 +1,30 @@ +package org.tron.core.net; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.RateLimiter; + +public class P2pRateLimiter { + private final Cache rateLimiters = CacheBuilder.newBuilder() + .maximumSize(256).build(); + + public void register (Byte type, double rate) { + rateLimiters.put(type, RateLimiter.create(rate)); + } + + public void acquire (Byte type) { + RateLimiter rateLimiter = rateLimiters.getIfPresent(type); + if (rateLimiter == null) { + return; + } + rateLimiter.acquire(); + } + + public boolean tryAcquire (Byte type) { + RateLimiter rateLimiter = rateLimiters.getIfPresent(type); + if (rateLimiter == null) { + return true; + } + return rateLimiter.tryAcquire(); + } +} diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index 5415ea435e3..7d25b995414 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -55,6 +55,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep FetchInvDataMessage fetchInvDataMsg = (FetchInvDataMessage) msg; + if (peer.isNeedSyncFromUs() && !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) { + // Discard messages that exceed the rate limit + logger.warn("{} message from peer {} exceeds the rate limit", + msg.getType(), peer.getInetSocketAddress()); + return; + } + check(peer, fetchInvDataMsg); InventoryType type = fetchInvDataMsg.getInventoryType(); @@ -156,6 +163,10 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr if (!peer.isNeedSyncFromUs()) { throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync"); } + if (fetchInvDataMsg.getHashList().size() > 100) { + throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too more blocks, size:" + + fetchInvDataMsg.getHashList().size()); + } for (Sha256Hash hash : fetchInvDataMsg.getHashList()) { long blockNum = new BlockId(hash).getNum(); long minBlockNum = diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java index 55446593bd0..4ab3c7dd172 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java @@ -31,6 +31,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep SyncBlockChainMessage syncBlockChainMessage = (SyncBlockChainMessage) msg; + if (!peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) { + // Discard messages that exceed the rate limit + logger.warn("{} message from peer {} exceeds the rate limit", + msg.getType(), peer.getInetSocketAddress()); + return; + } + if (!check(peer, syncBlockChainMessage)) { peer.disconnect(Protocol.ReasonCode.BAD_PROTOCOL); return; diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 2e08e105bed..d64dd731873 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -1,5 +1,9 @@ package org.tron.core.net.peer; +import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; +import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT; +import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN; + import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; @@ -32,7 +36,9 @@ import org.tron.core.config.args.Args; import org.tron.core.metrics.MetricsKey; import org.tron.core.metrics.MetricsUtil; +import org.tron.core.net.P2pRateLimiter; import org.tron.core.net.TronNetDelegate; +import org.tron.core.net.message.MessageTypes; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.message.adv.TransactionsMessage; import org.tron.core.net.message.base.DisconnectMessage; @@ -156,6 +162,8 @@ public class PeerConnection { @Setter @Getter private volatile boolean needSyncFromUs = true; + @Getter + private P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); public void setChannel(Channel channel) { this.channel = channel; @@ -164,6 +172,9 @@ public void setChannel(Channel channel) { } this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress()); lastInteractiveTime = System.currentTimeMillis(); + p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), 2); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1); + p2pRateLimiter.register(P2P_DISCONNECT.asByte(), 1); } public void setBlockBothHave(BlockId blockId) { From e50e11f7aaadfd5b4e21e2b01d6546a0d956d4df Mon Sep 17 00:00:00 2001 From: chengtx01 Date: Thu, 15 May 2025 19:17:46 +0800 Subject: [PATCH 2/8] feat(net):test for P2P message rate limit --- .../FetchInvDataMsgHandler.java | 3 +- .../org/tron/core/net/P2pRateLimiterTest.java | 21 ++++++++++++ .../FetchInvDataMsgHandlerTest.java | 34 +++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index 7d25b995414..3b52b5798c8 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -41,6 +41,7 @@ public class FetchInvDataMsgHandler implements TronMsgHandler { .maximumSize(1000).expireAfterWrite(1, TimeUnit.HOURS).build(); private static final int MAX_SIZE = 1_000_000; + private static final int MAX_FETCH_SIZE = 100; @Autowired private TronNetDelegate tronNetDelegate; @Autowired @@ -163,7 +164,7 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr if (!peer.isNeedSyncFromUs()) { throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync"); } - if (fetchInvDataMsg.getHashList().size() > 100) { + if (fetchInvDataMsg.getHashList().size() > MAX_FETCH_SIZE) { throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too more blocks, size:" + fetchInvDataMsg.getHashList().size()); } diff --git a/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java b/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java new file mode 100644 index 00000000000..8d6a10bb167 --- /dev/null +++ b/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java @@ -0,0 +1,21 @@ +package org.tron.core.net; + +import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; +import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN; + +import org.junit.Assert; +import org.junit.Test; + +public class P2pRateLimiterTest { + @Test + public void test() { + P2pRateLimiter limiter = new P2pRateLimiter(); + limiter.register(SYNC_BLOCK_CHAIN.asByte(), 2); + limiter.acquire(SYNC_BLOCK_CHAIN.asByte()); + limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); + boolean ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); + Assert.assertFalse(ret); + ret = limiter.tryAcquire(FETCH_INV_DATA.asByte()); + Assert.assertTrue(ret); + } +} diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java index 5fd6d6725ba..82be8bc3d5d 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java @@ -13,6 +13,7 @@ import org.tron.common.utils.Sha256Hash; import org.tron.core.capsule.BlockCapsule; import org.tron.core.config.Parameter; +import org.tron.core.net.P2pRateLimiter; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.adv.BlockMessage; import org.tron.core.net.message.adv.FetchInvDataMessage; @@ -21,6 +22,8 @@ import org.tron.core.net.service.adv.AdvService; import org.tron.protos.Protocol; +import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; + public class FetchInvDataMsgHandlerTest { @Test @@ -93,4 +96,35 @@ public void testSyncFetchCheck() { Assert.assertEquals(e.getMessage(), "minBlockNum: 16000, blockNum: 10000"); } } + + @Test + public void testRateLimiter() { + BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L); + List blockIds = new LinkedList<>(); + for (int i = 0; i <= 100; i++) { + blockIds.add(blockId); + } + FetchInvDataMessage msg = + new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK); + PeerConnection peer = Mockito.mock(PeerConnection.class); + Mockito.when(peer.isNeedSyncFromUs()).thenReturn(true); + Cache advInvSpread = CacheBuilder.newBuilder().maximumSize(100) + .expireAfterWrite(1, TimeUnit.HOURS).recordStats().build(); + Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread); + P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1); + Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter); + FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler(); + + try { + fetchInvDataMsgHandler.processMessage(peer, msg); + } catch (Exception e) { + Assert.assertEquals("fetch too more blocks, size:101", e.getMessage()); + } + try { + fetchInvDataMsgHandler.processMessage(peer, msg); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().endsWith("rate limit")); + } + } } From c2adb7dd07a23d4d6196bcc7bc465d9fae160415 Mon Sep 17 00:00:00 2001 From: chengtx01 Date: Tue, 8 Jul 2025 17:16:28 +0800 Subject: [PATCH 3/8] fix(net):fix P2P message rate limit issue --- .../main/java/org/tron/core/net/P2pRateLimiter.java | 12 +++++++----- .../java/org/tron/core/net/peer/PeerConnection.java | 2 +- .../java/org/tron/core/net/P2pRateLimiterTest.java | 4 +++- .../messagehandler/FetchInvDataMsgHandlerTest.java | 4 ++-- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java b/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java index aa1c7bbeffc..9b36e1e5df3 100644 --- a/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java +++ b/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java @@ -6,13 +6,15 @@ public class P2pRateLimiter { private final Cache rateLimiters = CacheBuilder.newBuilder() - .maximumSize(256).build(); + .maximumSize(32).build(); - public void register (Byte type, double rate) { - rateLimiters.put(type, RateLimiter.create(rate)); + public void register(Byte type, double rate) { + RateLimiter rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY); + rateLimiter.setRate(rate); + rateLimiters.put(type, rateLimiter); } - public void acquire (Byte type) { + public void acquire(Byte type) { RateLimiter rateLimiter = rateLimiters.getIfPresent(type); if (rateLimiter == null) { return; @@ -20,7 +22,7 @@ public void acquire (Byte type) { rateLimiter.acquire(); } - public boolean tryAcquire (Byte type) { + public boolean tryAcquire(Byte type) { RateLimiter rateLimiter = rateLimiters.getIfPresent(type); if (rateLimiter == null) { return true; diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index d64dd731873..b39f10494c4 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -173,7 +173,7 @@ public void setChannel(Channel channel) { this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress()); lastInteractiveTime = System.currentTimeMillis(); p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), 2); - p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2); p2pRateLimiter.register(P2P_DISCONNECT.asByte(), 1); } diff --git a/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java b/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java index 8d6a10bb167..8a1d9c52749 100644 --- a/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java +++ b/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java @@ -12,8 +12,10 @@ public void test() { P2pRateLimiter limiter = new P2pRateLimiter(); limiter.register(SYNC_BLOCK_CHAIN.asByte(), 2); limiter.acquire(SYNC_BLOCK_CHAIN.asByte()); - limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); boolean ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); + Assert.assertTrue(ret); + limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); + ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); Assert.assertFalse(ret); ret = limiter.tryAcquire(FETCH_INV_DATA.asByte()); Assert.assertTrue(ret); diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java index 82be8bc3d5d..f5c7abe3c4f 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java @@ -1,5 +1,7 @@ package org.tron.core.net.messagehandler; +import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; + import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.lang.reflect.Field; @@ -22,8 +24,6 @@ import org.tron.core.net.service.adv.AdvService; import org.tron.protos.Protocol; -import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; - public class FetchInvDataMsgHandlerTest { @Test From 3b3ddac0b0a8d77043f4dc9e41488ea0d139d063 Mon Sep 17 00:00:00 2001 From: chengtx01 Date: Tue, 8 Jul 2025 17:21:57 +0800 Subject: [PATCH 4/8] fix(net):delete unused import --- .../src/main/java/org/tron/core/net/peer/PeerConnection.java | 1 - 1 file changed, 1 deletion(-) diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index b39f10494c4..70a428b2165 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -38,7 +38,6 @@ import org.tron.core.metrics.MetricsUtil; import org.tron.core.net.P2pRateLimiter; import org.tron.core.net.TronNetDelegate; -import org.tron.core.net.message.MessageTypes; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.message.adv.TransactionsMessage; import org.tron.core.net.message.base.DisconnectMessage; From 2454a928340002d289a53b5a51d9fbecf000fb94 Mon Sep 17 00:00:00 2001 From: chengtx01 Date: Tue, 8 Jul 2025 18:59:03 +0800 Subject: [PATCH 5/8] feat(net):optimize rate limit logic --- .../messagehandler/SyncBlockChainMsgHandler.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java index 4ab3c7dd172..71d268b22bc 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java @@ -31,13 +31,6 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep SyncBlockChainMessage syncBlockChainMessage = (SyncBlockChainMessage) msg; - if (!peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) { - // Discard messages that exceed the rate limit - logger.warn("{} message from peer {} exceeds the rate limit", - msg.getType(), peer.getInetSocketAddress()); - return; - } - if (!check(peer, syncBlockChainMessage)) { peer.disconnect(Protocol.ReasonCode.BAD_PROTOCOL); return; @@ -65,6 +58,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep } private boolean check(PeerConnection peer, SyncBlockChainMessage msg) throws P2pException { + if (peer.getRemainNum() > 0 + && !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) { + // Discard messages that exceed the rate limit + logger.warn("{} message from peer {} exceeds the rate limit", + msg.getType(), peer.getInetSocketAddress()); + return false; + } + List blockIds = msg.getBlockIds(); if (CollectionUtils.isEmpty(blockIds)) { throw new P2pException(TypeEnum.BAD_MESSAGE, "SyncBlockChain blockIds is empty"); From 81c6528f382328bfe1408740f036773b4b09f7a0 Mon Sep 17 00:00:00 2001 From: chengtx01 Date: Fri, 11 Jul 2025 18:38:52 +0800 Subject: [PATCH 6/8] feat(net):Optimize rate limiting logic and add tests --- .../net/messagehandler/FetchInvDataMsgHandler.java | 14 +++++--------- .../org/tron/core/net/peer/PeerConnection.java | 10 +++++++--- .../messagehandler/FetchInvDataMsgHandlerTest.java | 7 +++++++ .../SyncBlockChainMsgHandlerTest.java | 5 +++++ 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index 3b52b5798c8..30e200fc8e1 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -41,7 +41,6 @@ public class FetchInvDataMsgHandler implements TronMsgHandler { .maximumSize(1000).expireAfterWrite(1, TimeUnit.HOURS).build(); private static final int MAX_SIZE = 1_000_000; - private static final int MAX_FETCH_SIZE = 100; @Autowired private TronNetDelegate tronNetDelegate; @Autowired @@ -56,13 +55,6 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep FetchInvDataMessage fetchInvDataMsg = (FetchInvDataMessage) msg; - if (peer.isNeedSyncFromUs() && !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) { - // Discard messages that exceed the rate limit - logger.warn("{} message from peer {} exceeds the rate limit", - msg.getType(), peer.getInetSocketAddress()); - return; - } - check(peer, fetchInvDataMsg); InventoryType type = fetchInvDataMsg.getInventoryType(); @@ -164,7 +156,11 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr if (!peer.isNeedSyncFromUs()) { throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync"); } - if (fetchInvDataMsg.getHashList().size() > MAX_FETCH_SIZE) { + if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) { + throw new P2pException(TypeEnum.BAD_MESSAGE, fetchInvDataMsg.getType() + + " message exceeds the rate limit"); + } + if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) { throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too more blocks, size:" + fetchInvDataMsg.getHashList().size()); } diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 70a428b2165..1d8cff3b060 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -59,6 +59,10 @@ public class PeerConnection { private static List relayNodes = Args.getInstance().getFastForwardNodes(); + private static final double SYNC_BLOCK_CHAIN_RATE = 3.0; + private static final double FETCH_INV_DATA_RATE = 3.0; + private static final double P2P_DISCONNECT_RATE = 1.0; + @Getter private PeerStatistics peerStatistics = new PeerStatistics(); @@ -171,9 +175,9 @@ public void setChannel(Channel channel) { } this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress()); lastInteractiveTime = System.currentTimeMillis(); - p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), 2); - p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2); - p2pRateLimiter.register(P2P_DISCONNECT.asByte(), 1); + p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), SYNC_BLOCK_CHAIN_RATE); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), FETCH_INV_DATA_RATE); + p2pRateLimiter.register(P2P_DISCONNECT.asByte(), P2P_DISCONNECT_RATE); } public void setBlockBothHave(BlockId blockId) { diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java index f5c7abe3c4f..acee2d3077d 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java @@ -58,6 +58,9 @@ public void testProcessMessage() throws Exception { Mockito.when(advService.getMessage(new Item(blockId, Protocol.Inventory.InventoryType.BLOCK))) .thenReturn(new BlockMessage(blockCapsule)); ReflectUtils.setFieldValue(fetchInvDataMsgHandler, "advService", advService); + P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2); + Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter); fetchInvDataMsgHandler.processMessage(peer, new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK)); @@ -77,6 +80,9 @@ public void testSyncFetchCheck() { Cache advInvSpread = CacheBuilder.newBuilder().maximumSize(100) .expireAfterWrite(1, TimeUnit.HOURS).recordStats().build(); Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread); + P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2); + Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter); FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler(); @@ -113,6 +119,7 @@ public void testRateLimiter() { Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread); P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1); + p2pRateLimiter.acquire(FETCH_INV_DATA.asByte()); Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter); FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler(); diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java index e654e1c9cc2..eccab2aeb00 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java @@ -55,6 +55,7 @@ public void init() throws Exception { @Test public void testProcessMessage() throws Exception { try { + peer.setRemainNum(1); handler.processMessage(peer, new SyncBlockChainMessage(new ArrayList<>())); } catch (P2pException e) { Assert.assertEquals("SyncBlockChain blockIds is empty", e.getMessage()); @@ -71,6 +72,10 @@ public void testProcessMessage() throws Exception { Assert.assertNotNull(message.toString()); Assert.assertNotNull(((BlockInventoryMessage) message).getAnswerMessage()); Assert.assertFalse(f); + method.invoke(handler, peer, message); + method.invoke(handler, peer, message); + f = (boolean)method.invoke(handler, peer, message); + Assert.assertFalse(f); Method method1 = handler.getClass().getDeclaredMethod( "getLostBlockIds", List.class, BlockId.class); From 26d5fe43fbb253762a39695db68a3664322ef0fd Mon Sep 17 00:00:00 2001 From: chengtx01 Date: Tue, 22 Jul 2025 17:02:26 +0800 Subject: [PATCH 7/8] feat(net): add configuration for p2p message rate limit --- .../tron/common/parameter/CommonParameter.java | 9 +++++++++ common/src/main/java/org/tron/core/Constant.java | 3 +++ .../org/tron/core/exception/P2pException.java | 1 + .../main/java/org/tron/core/config/args/Args.java | 15 +++++++++++++++ .../org/tron/core/net/P2pEventHandlerImpl.java | 1 + .../messagehandler/FetchInvDataMsgHandler.java | 2 +- .../org/tron/core/net/peer/PeerConnection.java | 13 ++++++------- 7 files changed, 36 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 45893970fb0..fcf3e719d3a 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -429,6 +429,15 @@ public class CommonParameter { @Getter public int rateLimiterGlobalApiQps; @Getter + @Setter + public double rateLimiterSyncBlockChain; + @Getter + @Setter + public double rateLimiterFetchInvData; + @Getter + @Setter + public double rateLimiterDisconnect; + @Getter public DbBackupConfig dbBackupConfig; @Getter public RocksDbSettings rocksDBCustomSettings; diff --git a/common/src/main/java/org/tron/core/Constant.java b/common/src/main/java/org/tron/core/Constant.java index c5a8a02fb4e..f1701e830ed 100644 --- a/common/src/main/java/org/tron/core/Constant.java +++ b/common/src/main/java/org/tron/core/Constant.java @@ -318,6 +318,9 @@ public class Constant { public static final String RATE_LIMITER_HTTP = "rate.limiter.http"; public static final String RATE_LIMITER_RPC = "rate.limiter.rpc"; + public static final String RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN = "rate.limiter.p2p.syncBlockChain"; + public static final String RATE_LIMITER_P2P_FETCH_INV_DATA = "rate.limiter.p2p.fetchInvData"; + public static final String RATE_LIMITER_P2P_DISCONNECT = "rate.limiter.p2p.disconnect"; public static final String SEED_NODE_IP_LIST = "seed.node.ip.list"; public static final String NODE_METRICS_ENABLE = "node.metricsEnable"; diff --git a/common/src/main/java/org/tron/core/exception/P2pException.java b/common/src/main/java/org/tron/core/exception/P2pException.java index 00d82e9fbf7..eae830627c2 100644 --- a/common/src/main/java/org/tron/core/exception/P2pException.java +++ b/common/src/main/java/org/tron/core/exception/P2pException.java @@ -52,6 +52,7 @@ public enum TypeEnum { PROTOBUF_ERROR(14, "protobuf inconsistent"), BLOCK_SIGN_ERROR(15, "block sign error"), BLOCK_MERKLE_ERROR(16, "block merkle error"), + RATE_LIMIT_EXCEEDED(17, "rate limit exceeded"), DEFAULT(100, "default exception"); diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 3162360bbb9..158aa5ba9c3 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -235,6 +235,9 @@ public static void clearParam() { PARAMETER.rateLimiterGlobalQps = 50000; PARAMETER.rateLimiterGlobalIpQps = 10000; PARAMETER.rateLimiterGlobalApiQps = 1000; + PARAMETER.rateLimiterSyncBlockChain = 3.0; + PARAMETER.rateLimiterFetchInvData = 3.0; + PARAMETER.rateLimiterDisconnect = 1.0; PARAMETER.p2pDisable = false; PARAMETER.dynamicConfigEnable = false; PARAMETER.dynamicConfigCheckInterval = 600; @@ -1041,6 +1044,18 @@ public static void setParam(final Config config) { PARAMETER.rateLimiterInitialization = getRateLimiterFromConfig(config); + PARAMETER.rateLimiterSyncBlockChain = + config.hasPath(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) ? config + .getDouble(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) : 3.0; + + PARAMETER.rateLimiterFetchInvData = + config.hasPath(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) ? config + .getDouble(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) : 3.0; + + PARAMETER.rateLimiterDisconnect = + config.hasPath(Constant.RATE_LIMITER_P2P_DISCONNECT) ? config + .getDouble(Constant.RATE_LIMITER_P2P_DISCONNECT) : 1.0; + PARAMETER.changedDelegation = config.hasPath(Constant.COMMITTEE_CHANGED_DELEGATION) ? config .getInt(Constant.COMMITTEE_CHANGED_DELEGATION) : 0; diff --git a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java index 90c3739a3ec..2c631052b9d 100644 --- a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java +++ b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java @@ -261,6 +261,7 @@ private void processException(PeerConnection peer, TronMessage msg, Exception ex code = Protocol.ReasonCode.NO_SUCH_MESSAGE; break; case BAD_MESSAGE: + case RATE_LIMIT_EXCEEDED: code = Protocol.ReasonCode.BAD_PROTOCOL; break; case SYNC_FAILED: diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index 30e200fc8e1..d2b1a82e8db 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -157,7 +157,7 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync"); } if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) { - throw new P2pException(TypeEnum.BAD_MESSAGE, fetchInvDataMsg.getType() + throw new P2pException(TypeEnum.RATE_LIMIT_EXCEEDED, fetchInvDataMsg.getType() + " message exceeds the rate limit"); } if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) { diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 1d8cff3b060..bff94d8ab42 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -59,10 +59,6 @@ public class PeerConnection { private static List relayNodes = Args.getInstance().getFastForwardNodes(); - private static final double SYNC_BLOCK_CHAIN_RATE = 3.0; - private static final double FETCH_INV_DATA_RATE = 3.0; - private static final double P2P_DISCONNECT_RATE = 1.0; - @Getter private PeerStatistics peerStatistics = new PeerStatistics(); @@ -175,9 +171,12 @@ public void setChannel(Channel channel) { } this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress()); lastInteractiveTime = System.currentTimeMillis(); - p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), SYNC_BLOCK_CHAIN_RATE); - p2pRateLimiter.register(FETCH_INV_DATA.asByte(), FETCH_INV_DATA_RATE); - p2pRateLimiter.register(P2P_DISCONNECT.asByte(), P2P_DISCONNECT_RATE); + p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), + Args.getInstance().getRateLimiterSyncBlockChain()); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), + Args.getInstance().getRateLimiterFetchInvData()); + p2pRateLimiter.register(P2P_DISCONNECT.asByte(), + Args.getInstance().getRateLimiterDisconnect()); } public void setBlockBothHave(BlockId blockId) { From 46b1c0935cc8f0a434841d8b6a25c58af0f12d24 Mon Sep 17 00:00:00 2001 From: chengtx01 Date: Tue, 5 Aug 2025 11:54:12 +0800 Subject: [PATCH 8/8] fix(log): fix log issue --- .../tron/core/net/messagehandler/FetchInvDataMsgHandler.java | 2 +- .../core/net/messagehandler/FetchInvDataMsgHandlerTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index d2b1a82e8db..38cf3f2a0e2 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -161,7 +161,7 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr + " message exceeds the rate limit"); } if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) { - throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too more blocks, size:" + throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too many blocks, size:" + fetchInvDataMsg.getHashList().size()); } for (Sha256Hash hash : fetchInvDataMsg.getHashList()) { diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java index acee2d3077d..43036ce142a 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java @@ -126,7 +126,7 @@ public void testRateLimiter() { try { fetchInvDataMsgHandler.processMessage(peer, msg); } catch (Exception e) { - Assert.assertEquals("fetch too more blocks, size:101", e.getMessage()); + Assert.assertEquals("fetch too many blocks, size:101", e.getMessage()); } try { fetchInvDataMsgHandler.processMessage(peer, msg);