diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 45893970fb0..fcf3e719d3a 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -429,6 +429,15 @@ public class CommonParameter { @Getter public int rateLimiterGlobalApiQps; @Getter + @Setter + public double rateLimiterSyncBlockChain; + @Getter + @Setter + public double rateLimiterFetchInvData; + @Getter + @Setter + public double rateLimiterDisconnect; + @Getter public DbBackupConfig dbBackupConfig; @Getter public RocksDbSettings rocksDBCustomSettings; diff --git a/common/src/main/java/org/tron/core/Constant.java b/common/src/main/java/org/tron/core/Constant.java index c5a8a02fb4e..f1701e830ed 100644 --- a/common/src/main/java/org/tron/core/Constant.java +++ b/common/src/main/java/org/tron/core/Constant.java @@ -318,6 +318,9 @@ public class Constant { public static final String RATE_LIMITER_HTTP = "rate.limiter.http"; public static final String RATE_LIMITER_RPC = "rate.limiter.rpc"; + public static final String RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN = "rate.limiter.p2p.syncBlockChain"; + public static final String RATE_LIMITER_P2P_FETCH_INV_DATA = "rate.limiter.p2p.fetchInvData"; + public static final String RATE_LIMITER_P2P_DISCONNECT = "rate.limiter.p2p.disconnect"; public static final String SEED_NODE_IP_LIST = "seed.node.ip.list"; public static final String NODE_METRICS_ENABLE = "node.metricsEnable"; diff --git a/common/src/main/java/org/tron/core/exception/P2pException.java b/common/src/main/java/org/tron/core/exception/P2pException.java index 00d82e9fbf7..eae830627c2 100644 --- a/common/src/main/java/org/tron/core/exception/P2pException.java +++ b/common/src/main/java/org/tron/core/exception/P2pException.java @@ -52,6 +52,7 @@ public enum TypeEnum { PROTOBUF_ERROR(14, "protobuf inconsistent"), BLOCK_SIGN_ERROR(15, "block sign error"), BLOCK_MERKLE_ERROR(16, "block merkle error"), + RATE_LIMIT_EXCEEDED(17, "rate limit exceeded"), DEFAULT(100, "default exception"); diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 3162360bbb9..158aa5ba9c3 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -235,6 +235,9 @@ public static void clearParam() { PARAMETER.rateLimiterGlobalQps = 50000; PARAMETER.rateLimiterGlobalIpQps = 10000; PARAMETER.rateLimiterGlobalApiQps = 1000; + PARAMETER.rateLimiterSyncBlockChain = 3.0; + PARAMETER.rateLimiterFetchInvData = 3.0; + PARAMETER.rateLimiterDisconnect = 1.0; PARAMETER.p2pDisable = false; PARAMETER.dynamicConfigEnable = false; PARAMETER.dynamicConfigCheckInterval = 600; @@ -1041,6 +1044,18 @@ public static void setParam(final Config config) { PARAMETER.rateLimiterInitialization = getRateLimiterFromConfig(config); + PARAMETER.rateLimiterSyncBlockChain = + config.hasPath(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) ? config + .getDouble(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) : 3.0; + + PARAMETER.rateLimiterFetchInvData = + config.hasPath(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) ? config + .getDouble(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) : 3.0; + + PARAMETER.rateLimiterDisconnect = + config.hasPath(Constant.RATE_LIMITER_P2P_DISCONNECT) ? config + .getDouble(Constant.RATE_LIMITER_P2P_DISCONNECT) : 1.0; + PARAMETER.changedDelegation = config.hasPath(Constant.COMMITTEE_CHANGED_DELEGATION) ? config .getInt(Constant.COMMITTEE_CHANGED_DELEGATION) : 0; diff --git a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java index 795c90b4edd..2c631052b9d 100644 --- a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java +++ b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java @@ -178,9 +178,11 @@ private void processMessage(PeerConnection peer, byte[] data) { handshakeService.processHelloMessage(peer, (HelloMessage) msg); break; case P2P_DISCONNECT: - peer.getChannel().close(); - peer.getNodeStatistics() - .nodeDisconnectedRemote(((DisconnectMessage)msg).getReason()); + if (peer.getP2pRateLimiter().tryAcquire(type.asByte())) { + peer.getChannel().close(); + peer.getNodeStatistics() + .nodeDisconnectedRemote(((DisconnectMessage)msg).getReason()); + } break; case SYNC_BLOCK_CHAIN: syncBlockChainMsgHandler.processMessage(peer, msg); @@ -259,6 +261,7 @@ private void processException(PeerConnection peer, TronMessage msg, Exception ex code = Protocol.ReasonCode.NO_SUCH_MESSAGE; break; case BAD_MESSAGE: + case RATE_LIMIT_EXCEEDED: code = Protocol.ReasonCode.BAD_PROTOCOL; break; case SYNC_FAILED: diff --git a/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java b/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java new file mode 100644 index 00000000000..9b36e1e5df3 --- /dev/null +++ b/framework/src/main/java/org/tron/core/net/P2pRateLimiter.java @@ -0,0 +1,32 @@ +package org.tron.core.net; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.RateLimiter; + +public class P2pRateLimiter { + private final Cache rateLimiters = CacheBuilder.newBuilder() + .maximumSize(32).build(); + + public void register(Byte type, double rate) { + RateLimiter rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY); + rateLimiter.setRate(rate); + rateLimiters.put(type, rateLimiter); + } + + public void acquire(Byte type) { + RateLimiter rateLimiter = rateLimiters.getIfPresent(type); + if (rateLimiter == null) { + return; + } + rateLimiter.acquire(); + } + + public boolean tryAcquire(Byte type) { + RateLimiter rateLimiter = rateLimiters.getIfPresent(type); + if (rateLimiter == null) { + return true; + } + return rateLimiter.tryAcquire(); + } +} diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index 5415ea435e3..38cf3f2a0e2 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -156,6 +156,14 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr if (!peer.isNeedSyncFromUs()) { throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync"); } + if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) { + throw new P2pException(TypeEnum.RATE_LIMIT_EXCEEDED, fetchInvDataMsg.getType() + + " message exceeds the rate limit"); + } + if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) { + throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too many blocks, size:" + + fetchInvDataMsg.getHashList().size()); + } for (Sha256Hash hash : fetchInvDataMsg.getHashList()) { long blockNum = new BlockId(hash).getNum(); long minBlockNum = diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java index 55446593bd0..71d268b22bc 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java @@ -58,6 +58,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep } private boolean check(PeerConnection peer, SyncBlockChainMessage msg) throws P2pException { + if (peer.getRemainNum() > 0 + && !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) { + // Discard messages that exceed the rate limit + logger.warn("{} message from peer {} exceeds the rate limit", + msg.getType(), peer.getInetSocketAddress()); + return false; + } + List blockIds = msg.getBlockIds(); if (CollectionUtils.isEmpty(blockIds)) { throw new P2pException(TypeEnum.BAD_MESSAGE, "SyncBlockChain blockIds is empty"); diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 2e08e105bed..bff94d8ab42 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -1,5 +1,9 @@ package org.tron.core.net.peer; +import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; +import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT; +import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN; + import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; @@ -32,6 +36,7 @@ import org.tron.core.config.args.Args; import org.tron.core.metrics.MetricsKey; import org.tron.core.metrics.MetricsUtil; +import org.tron.core.net.P2pRateLimiter; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.message.adv.TransactionsMessage; @@ -156,6 +161,8 @@ public class PeerConnection { @Setter @Getter private volatile boolean needSyncFromUs = true; + @Getter + private P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); public void setChannel(Channel channel) { this.channel = channel; @@ -164,6 +171,12 @@ public void setChannel(Channel channel) { } this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress()); lastInteractiveTime = System.currentTimeMillis(); + p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), + Args.getInstance().getRateLimiterSyncBlockChain()); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), + Args.getInstance().getRateLimiterFetchInvData()); + p2pRateLimiter.register(P2P_DISCONNECT.asByte(), + Args.getInstance().getRateLimiterDisconnect()); } public void setBlockBothHave(BlockId blockId) { diff --git a/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java b/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java new file mode 100644 index 00000000000..8a1d9c52749 --- /dev/null +++ b/framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java @@ -0,0 +1,23 @@ +package org.tron.core.net; + +import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; +import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN; + +import org.junit.Assert; +import org.junit.Test; + +public class P2pRateLimiterTest { + @Test + public void test() { + P2pRateLimiter limiter = new P2pRateLimiter(); + limiter.register(SYNC_BLOCK_CHAIN.asByte(), 2); + limiter.acquire(SYNC_BLOCK_CHAIN.asByte()); + boolean ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); + Assert.assertTrue(ret); + limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); + ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); + Assert.assertFalse(ret); + ret = limiter.tryAcquire(FETCH_INV_DATA.asByte()); + Assert.assertTrue(ret); + } +} diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java index 5fd6d6725ba..43036ce142a 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java @@ -1,5 +1,7 @@ package org.tron.core.net.messagehandler; +import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; + import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.lang.reflect.Field; @@ -13,6 +15,7 @@ import org.tron.common.utils.Sha256Hash; import org.tron.core.capsule.BlockCapsule; import org.tron.core.config.Parameter; +import org.tron.core.net.P2pRateLimiter; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.adv.BlockMessage; import org.tron.core.net.message.adv.FetchInvDataMessage; @@ -55,6 +58,9 @@ public void testProcessMessage() throws Exception { Mockito.when(advService.getMessage(new Item(blockId, Protocol.Inventory.InventoryType.BLOCK))) .thenReturn(new BlockMessage(blockCapsule)); ReflectUtils.setFieldValue(fetchInvDataMsgHandler, "advService", advService); + P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2); + Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter); fetchInvDataMsgHandler.processMessage(peer, new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK)); @@ -74,6 +80,9 @@ public void testSyncFetchCheck() { Cache advInvSpread = CacheBuilder.newBuilder().maximumSize(100) .expireAfterWrite(1, TimeUnit.HOURS).recordStats().build(); Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread); + P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2); + Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter); FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler(); @@ -93,4 +102,36 @@ public void testSyncFetchCheck() { Assert.assertEquals(e.getMessage(), "minBlockNum: 16000, blockNum: 10000"); } } + + @Test + public void testRateLimiter() { + BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L); + List blockIds = new LinkedList<>(); + for (int i = 0; i <= 100; i++) { + blockIds.add(blockId); + } + FetchInvDataMessage msg = + new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK); + PeerConnection peer = Mockito.mock(PeerConnection.class); + Mockito.when(peer.isNeedSyncFromUs()).thenReturn(true); + Cache advInvSpread = CacheBuilder.newBuilder().maximumSize(100) + .expireAfterWrite(1, TimeUnit.HOURS).recordStats().build(); + Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread); + P2pRateLimiter p2pRateLimiter = new P2pRateLimiter(); + p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1); + p2pRateLimiter.acquire(FETCH_INV_DATA.asByte()); + Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter); + FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler(); + + try { + fetchInvDataMsgHandler.processMessage(peer, msg); + } catch (Exception e) { + Assert.assertEquals("fetch too many blocks, size:101", e.getMessage()); + } + try { + fetchInvDataMsgHandler.processMessage(peer, msg); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().endsWith("rate limit")); + } + } } diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java index e654e1c9cc2..eccab2aeb00 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java @@ -55,6 +55,7 @@ public void init() throws Exception { @Test public void testProcessMessage() throws Exception { try { + peer.setRemainNum(1); handler.processMessage(peer, new SyncBlockChainMessage(new ArrayList<>())); } catch (P2pException e) { Assert.assertEquals("SyncBlockChain blockIds is empty", e.getMessage()); @@ -71,6 +72,10 @@ public void testProcessMessage() throws Exception { Assert.assertNotNull(message.toString()); Assert.assertNotNull(((BlockInventoryMessage) message).getAnswerMessage()); Assert.assertFalse(f); + method.invoke(handler, peer, message); + method.invoke(handler, peer, message); + f = (boolean)method.invoke(handler, peer, message); + Assert.assertFalse(f); Method method1 = handler.getClass().getDeclaredMethod( "getLostBlockIds", List.class, BlockId.class);