-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat(net): add rate limiting logic for P2P messages #6393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
aa682c9
e50e11f
c2adb7d
3b3ddac
2454a92
81c6528
26d5fe4
46b1c09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -318,6 +318,9 @@ public class Constant { | |
|
|
||
| public static final String RATE_LIMITER_HTTP = "rate.limiter.http"; | ||
| public static final String RATE_LIMITER_RPC = "rate.limiter.rpc"; | ||
| public static final String RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN = "rate.limiter.p2p.syncBlockChain"; | ||
| public static final String RATE_LIMITER_P2P_FETCH_INV_DATA = "rate.limiter.p2p.fetchInvData"; | ||
| public static final String RATE_LIMITER_P2P_DISCONNECT = "rate.limiter.p2p.disconnect"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As user can set the value, so add this configuration to config file? such as config.conf.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a repeated question; please refer to the previous response for details. |
||
|
|
||
| public static final String SEED_NODE_IP_LIST = "seed.node.ip.list"; | ||
| public static final String NODE_METRICS_ENABLE = "node.metricsEnable"; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -235,6 +235,9 @@ public static void clearParam() { | |
| PARAMETER.rateLimiterGlobalQps = 50000; | ||
| PARAMETER.rateLimiterGlobalIpQps = 10000; | ||
| PARAMETER.rateLimiterGlobalApiQps = 1000; | ||
| PARAMETER.rateLimiterSyncBlockChain = 3.0; | ||
| PARAMETER.rateLimiterFetchInvData = 3.0; | ||
| PARAMETER.rateLimiterDisconnect = 1.0; | ||
| PARAMETER.p2pDisable = false; | ||
| PARAMETER.dynamicConfigEnable = false; | ||
| PARAMETER.dynamicConfigCheckInterval = 600; | ||
|
|
@@ -1041,6 +1044,18 @@ public static void setParam(final Config config) { | |
|
|
||
| PARAMETER.rateLimiterInitialization = getRateLimiterFromConfig(config); | ||
|
|
||
| PARAMETER.rateLimiterSyncBlockChain = | ||
| config.hasPath(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) ? config | ||
| .getDouble(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) : 3.0; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment specifying the rate limit unit: is this per second, per minute, per hour, or something?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If no special instructions are given, the default is qps. |
||
|
|
||
| PARAMETER.rateLimiterFetchInvData = | ||
| config.hasPath(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) ? config | ||
| .getDouble(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) : 3.0; | ||
|
|
||
| PARAMETER.rateLimiterDisconnect = | ||
| config.hasPath(Constant.RATE_LIMITER_P2P_DISCONNECT) ? config | ||
| .getDouble(Constant.RATE_LIMITER_P2P_DISCONNECT) : 1.0; | ||
|
|
||
| PARAMETER.changedDelegation = | ||
| config.hasPath(Constant.COMMITTEE_CHANGED_DELEGATION) ? config | ||
| .getInt(Constant.COMMITTEE_CHANGED_DELEGATION) : 0; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -178,9 +178,11 @@ private void processMessage(PeerConnection peer, byte[] data) { | |
| handshakeService.processHelloMessage(peer, (HelloMessage) msg); | ||
| break; | ||
| case P2P_DISCONNECT: | ||
| peer.getChannel().close(); | ||
| peer.getNodeStatistics() | ||
| .nodeDisconnectedRemote(((DisconnectMessage)msg).getReason()); | ||
| if (peer.getP2pRateLimiter().tryAcquire(type.asByte())) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why disconnect need a rate limiter? As in your issue mentioned "After receiving the message, the connection will be disconnected and no response will be given." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another question: is this rate limit set per Peer or shared by all Peers? Will this rate limiter effect the normal disconnect logic?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This rate limit is set per peer. This rate limiter does not affect the normal disconnection logic.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
To prevent the peer from sending a large number of repeated disconnect messages at once, it will not respond to the message, but will execute the disconnection logic. |
||
| peer.getChannel().close(); | ||
| peer.getNodeStatistics() | ||
| .nodeDisconnectedRemote(((DisconnectMessage)msg).getReason()); | ||
| } | ||
| break; | ||
| case SYNC_BLOCK_CHAIN: | ||
| syncBlockChainMsgHandler.processMessage(peer, msg); | ||
|
|
@@ -259,6 +261,7 @@ private void processException(PeerConnection peer, TronMessage msg, Exception ex | |
| code = Protocol.ReasonCode.NO_SUCH_MESSAGE; | ||
| break; | ||
| case BAD_MESSAGE: | ||
| case RATE_LIMIT_EXCEEDED: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This mapping doesn't seem quite right.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to normal protocol logic, RATE_LIMIT_EXCEEDED will not appear. Once it appears, it is considered a protocol violation. |
||
| code = Protocol.ReasonCode.BAD_PROTOCOL; | ||
| break; | ||
| case SYNC_FAILED: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| package org.tron.core.net; | ||
|
|
||
| import com.google.common.cache.Cache; | ||
| import com.google.common.cache.CacheBuilder; | ||
| import com.google.common.util.concurrent.RateLimiter; | ||
|
|
||
| public class P2pRateLimiter { | ||
| private final Cache<Byte, RateLimiter> rateLimiters = CacheBuilder.newBuilder() | ||
| .maximumSize(32).build(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here is a hidden size 32 limit, can make it a large one so it normally won't never hit the limit?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The message types defined by the current protocol are very limited, and the limit of 32 is sufficient for the foreseeable future. |
||
|
|
||
| public void register(Byte type, double rate) { | ||
| RateLimiter rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about create the RateLimter with rate, such as:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The approach you mentioned has a problem: the initial capacity of the token bucket is only 1. Once the peer completes the handshake, it immediately enters the synchronization state. If the initial token count is 1, it may lead to insufficient tokens. |
||
| rateLimiter.setRate(rate); | ||
| rateLimiters.put(type, rateLimiter); | ||
| } | ||
|
|
||
| public void acquire(Byte type) { | ||
| RateLimiter rateLimiter = rateLimiters.getIfPresent(type); | ||
| if (rateLimiter == null) { | ||
| return; | ||
| } | ||
| rateLimiter.acquire(); | ||
| } | ||
|
|
||
| public boolean tryAcquire(Byte type) { | ||
| RateLimiter rateLimiter = rateLimiters.getIfPresent(type); | ||
| if (rateLimiter == null) { | ||
| return true; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any risk if return true for the unregistered type(rateLimter == null)?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unregistered types return true, meaning no speed limit. |
||
| } | ||
| return rateLimiter.tryAcquire(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -156,6 +156,14 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr | |
| if (!peer.isNeedSyncFromUs()) { | ||
| throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync"); | ||
| } | ||
| if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This rate limit is based on the premise of block synchronization. |
||
| throw new P2pException(TypeEnum.RATE_LIMIT_EXCEEDED, fetchInvDataMsg.getType() | ||
| + " message exceeds the rate limit"); | ||
| } | ||
| if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) { | ||
| throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too many blocks, size:" | ||
| + fetchInvDataMsg.getHashList().size()); | ||
| } | ||
| for (Sha256Hash hash : fetchInvDataMsg.getHashList()) { | ||
| long blockNum = new BlockId(hash).getNum(); | ||
| long minBlockNum = | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| package org.tron.core.net; | ||
|
|
||
| import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA; | ||
| import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN; | ||
|
|
||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
| public class P2pRateLimiterTest { | ||
| @Test | ||
| public void test() { | ||
| P2pRateLimiter limiter = new P2pRateLimiter(); | ||
| limiter.register(SYNC_BLOCK_CHAIN.asByte(), 2); | ||
| limiter.acquire(SYNC_BLOCK_CHAIN.asByte()); | ||
| boolean ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); | ||
| Assert.assertTrue(ret); | ||
| limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); | ||
| ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte()); | ||
| Assert.assertFalse(ret); | ||
| ret = limiter.tryAcquire(FETCH_INV_DATA.asByte()); | ||
| Assert.assertTrue(ret); | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.