From 1bc6bddb3c3d396c73c1a14d9ac9ac4cddc619c7 Mon Sep 17 00:00:00 2001 From: jiangyuanshu <317787106@qq.com> Date: Tue, 23 Dec 2025 22:17:51 +0800 Subject: [PATCH] Optimize memory allocation for the filter interface of JSON-RPC, eth_newFilter, eth_newBlockFilter --- .../tron/core/db2/core/SnapshotManager.java | 1 + .../common/parameter/CommonParameter.java | 3 + .../src/main/java/org/tron/core/Constant.java | 1 + .../jsonrpc/JsonRpcExceedLimitException.java | 16 +++++ .../common/logsfilter/EventPluginLoader.java | 1 + .../java/org/tron/core/config/args/Args.java | 6 ++ .../tron/core/net/peer/PeerStatusCheck.java | 2 +- .../core/services/jsonrpc/TronJsonRpc.java | 38 +++++++++++- .../services/jsonrpc/TronJsonRpcImpl.java | 60 ++++++++++++++++--- .../src/main/resources/config-localtest.conf | 1 + framework/src/main/resources/config.conf | 2 + .../core/jsonrpc/LogMatchExactlyTest.java | 13 ++++ .../nodepersist/NodePersistServiceTest.java | 40 +++++++++++++ .../core/zksnark/ShieldedReceiveTest.java | 5 ++ .../src/test/resources/config-localtest.conf | 1 + .../src/test/resources/config-test-index.conf | 1 + .../test/resources/config-test-mainnet.conf | 1 + framework/src/test/resources/config-test.conf | 1 + 18 files changed, 184 insertions(+), 9 deletions(-) create mode 100644 common/src/main/java/org/tron/core/exception/jsonrpc/JsonRpcExceedLimitException.java create mode 100644 framework/src/test/java/org/tron/core/net/service/nodepersist/NodePersistServiceTest.java diff --git a/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java b/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java index 19f110c4021..e20490d93c0 100644 --- a/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java +++ b/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java @@ -423,6 +423,7 @@ public List getCheckpointList() { private void deleteCheckpoint() { if(checkTmpStore == null) { + // only occurs in mock test. TODO fix test return; } try { diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index a602a660f8f..84dc58c48d2 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -527,6 +527,9 @@ public class CommonParameter { @Getter @Setter public int jsonRpcMaxSubTopics = 1000; + @Getter + @Setter + public int jsonRpcMaxBlockFilterNum = 50000; @Getter @Setter diff --git a/common/src/main/java/org/tron/core/Constant.java b/common/src/main/java/org/tron/core/Constant.java index f7ccb9bd507..47331808a5b 100644 --- a/common/src/main/java/org/tron/core/Constant.java +++ b/common/src/main/java/org/tron/core/Constant.java @@ -154,6 +154,7 @@ public class Constant { public static final String NODE_JSONRPC_HTTP_PBFT_PORT = "node.jsonrpc.httpPBFTPort"; public static final String NODE_JSONRPC_MAX_BLOCK_RANGE = "node.jsonrpc.maxBlockRange"; public static final String NODE_JSONRPC_MAX_SUB_TOPICS = "node.jsonrpc.maxSubTopics"; + public static final String NODE_JSONRPC_MAX_BLOCK_FILTER_NUM = "node.jsonrpc.maxBlockFilterNum"; public static final String NODE_DISABLED_API_LIST = "node.disabledApi"; diff --git a/common/src/main/java/org/tron/core/exception/jsonrpc/JsonRpcExceedLimitException.java b/common/src/main/java/org/tron/core/exception/jsonrpc/JsonRpcExceedLimitException.java new file mode 100644 index 00000000000..9e6f59d4636 --- /dev/null +++ b/common/src/main/java/org/tron/core/exception/jsonrpc/JsonRpcExceedLimitException.java @@ -0,0 +1,16 @@ +package org.tron.core.exception.jsonrpc; + +public class JsonRpcExceedLimitException extends JsonRpcException { + + public JsonRpcExceedLimitException() { + super(); + } + + public JsonRpcExceedLimitException(String message) { + super(message); + } + + public JsonRpcExceedLimitException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file diff --git a/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java b/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java index 6d16be50164..7061b2e9d57 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java +++ b/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java @@ -546,6 +546,7 @@ public boolean isBusy() { } int queueSize = 0; if (eventListeners == null || eventListeners.isEmpty()) { + // only occurs in mock test. TODO fix test return false; } for (IPluginEventListener listener : eventListeners) { diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 89b35d8bcd0..46695986c1f 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -207,6 +207,7 @@ public static void clearParam() { PARAMETER.jsonRpcHttpPBFTNodeEnable = false; PARAMETER.jsonRpcMaxBlockRange = 5000; PARAMETER.jsonRpcMaxSubTopics = 1000; + PARAMETER.jsonRpcMaxBlockFilterNum = 50000; PARAMETER.nodeMetricsEnable = false; PARAMETER.metricsStorageEnable = false; PARAMETER.metricsPrometheusEnable = false; @@ -491,6 +492,11 @@ public static void setParam(final Config config) { config.getInt(Constant.NODE_JSONRPC_MAX_SUB_TOPICS); } + if (config.hasPath(Constant.NODE_JSONRPC_MAX_BLOCK_FILTER_NUM)) { + PARAMETER.jsonRpcMaxBlockFilterNum = + config.getInt(Constant.NODE_JSONRPC_MAX_BLOCK_FILTER_NUM); + } + if (config.hasPath(Constant.VM_MIN_TIME_RATIO)) { PARAMETER.minTimeRatio = config.getDouble(Constant.VM_MIN_TIME_RATIO); } diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerStatusCheck.java b/framework/src/main/java/org/tron/core/net/peer/PeerStatusCheck.java index 14bd2fe1cec..04eac202484 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerStatusCheck.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerStatusCheck.java @@ -43,7 +43,7 @@ public void statusCheck() { long now = System.currentTimeMillis(); if (tronNetDelegate == null) { - //only occurs in mock test. TODO fix test + // only occurs in mock test. TODO fix test return; } tronNetDelegate.getActivePeer().forEach(peer -> { diff --git a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpc.java b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpc.java index 658c8051038..115df6ef9da 100644 --- a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpc.java +++ b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpc.java @@ -7,6 +7,7 @@ import com.googlecode.jsonrpc4j.JsonRpcMethod; import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutionException; import lombok.AllArgsConstructor; import lombok.Getter; @@ -18,6 +19,7 @@ import org.tron.common.utils.ByteArray; import org.tron.core.exception.BadItemException; import org.tron.core.exception.ItemNotFoundException; +import org.tron.core.exception.jsonrpc.JsonRpcExceedLimitException; import org.tron.core.exception.jsonrpc.JsonRpcInternalException; import org.tron.core.exception.jsonrpc.JsonRpcInvalidParamsException; import org.tron.core.exception.jsonrpc.JsonRpcInvalidRequestException; @@ -29,6 +31,9 @@ import org.tron.core.services.jsonrpc.types.TransactionReceipt; import org.tron.core.services.jsonrpc.types.TransactionResult; +/** + * Error code refers to https://www.quicknode.com/docs/ethereum/error-references + */ @Component public interface TronJsonRpc { @@ -292,9 +297,10 @@ String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException, @JsonRpcMethod("eth_newBlockFilter") @JsonRpcErrors({ + @JsonRpcError(exception = JsonRpcExceedLimitException.class, code = -32005, data = "{}"), @JsonRpcError(exception = JsonRpcMethodNotFoundException.class, code = -32601, data = "{}"), }) - String newBlockFilter() throws JsonRpcMethodNotFoundException; + String newBlockFilter() throws JsonRpcExceedLimitException, JsonRpcMethodNotFoundException; @JsonRpcMethod("eth_uninstallFilter") @JsonRpcErrors({ @@ -472,5 +478,35 @@ public LogFilterElement(String blockHash, Long blockNum, String txId, Integer tx } this.removed = removed; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || this.getClass() != o.getClass()) { + return false; + } + LogFilterElement item = (LogFilterElement) o; + if (!Objects.equals(blockHash, item.blockHash)) { + return false; + } + if (!Objects.equals(transactionHash, item.transactionHash)) { + return false; + } + if (!Objects.equals(transactionIndex, item.transactionIndex)) { + return false; + } + if (!Objects.equals(logIndex, item.logIndex)) { + return false; + } + return removed == item.removed; + } + + @Override + public int hashCode() { + return Objects.hash(blockHash, transactionHash, transactionIndex, logIndex, removed); + } + } } diff --git a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java index 7a1518038d6..de939bdfff4 100644 --- a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java +++ b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java @@ -11,6 +11,8 @@ import static org.tron.core.services.jsonrpc.JsonRpcApiUtil.triggerCallContract; import com.alibaba.fastjson.JSON; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; import com.google.protobuf.GeneratedMessageV3; import java.io.Closeable; @@ -25,10 +27,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.bouncycastle.util.encoders.Hex; import org.springframework.beans.factory.annotation.Autowired; @@ -51,6 +53,7 @@ import org.tron.core.Wallet; import org.tron.core.capsule.BlockCapsule; import org.tron.core.capsule.TransactionCapsule; +import org.tron.core.config.args.Args; import org.tron.core.db.Manager; import org.tron.core.db2.core.Chainbase; import org.tron.core.exception.BadItemException; @@ -59,6 +62,7 @@ import org.tron.core.exception.HeaderNotFound; import org.tron.core.exception.ItemNotFoundException; import org.tron.core.exception.VMIllegalException; +import org.tron.core.exception.jsonrpc.JsonRpcExceedLimitException; import org.tron.core.exception.jsonrpc.JsonRpcInternalException; import org.tron.core.exception.jsonrpc.JsonRpcInvalidParamsException; import org.tron.core.exception.jsonrpc.JsonRpcInvalidRequestException; @@ -110,6 +114,17 @@ public enum RequestSource { private static final String FILTER_NOT_FOUND = "filter not found"; public static final int EXPIRE_SECONDS = 5 * 60; + private static final int maxBlockFilterNum = Args.getInstance().getJsonRpcMaxBlockFilterNum(); + private static final Cache logElementCache = + CacheBuilder.newBuilder() + .maximumSize(300_000L) // 300s * tps(1000) * 1 log/tx ≈ 300_000 + .expireAfterWrite(EXPIRE_SECONDS, TimeUnit.SECONDS) + .recordStats().build(); //LRU cache + private static final Cache blockHashCache = + CacheBuilder.newBuilder() + .maximumSize(60_000L) // 300s * 200 block/s when syncing + .expireAfterWrite(EXPIRE_SECONDS, TimeUnit.SECONDS) + .recordStats().build(); //LRU cache /** * for log filter in Full Json-RPC */ @@ -181,16 +196,31 @@ public static void handleBLockFilter(BlockFilterCapsule blockFilterCapsule) { it = getBlockFilter2ResultFull().entrySet().iterator(); } + if (!it.hasNext()) { + return; + } + final String originalBlockHash = ByteArray.toJsonHex(blockFilterCapsule.getBlockHash()); + String cachedBlockHash; + try { + // compare with hashcode() first, then with equals(). If not exist, put it. + cachedBlockHash = blockHashCache.get(originalBlockHash, () -> originalBlockHash); + } catch (ExecutionException e) { + logger.error("Getting/loading blockHash from cache failed", e); // never happen + cachedBlockHash = originalBlockHash; + } while (it.hasNext()) { Entry entry = it.next(); if (entry.getValue().isExpire()) { it.remove(); continue; } - entry.getValue().getResult().add(ByteArray.toJsonHex(blockFilterCapsule.getBlockHash())); + entry.getValue().getResult().add(cachedBlockHash); } } + /** + * append LogsFilterCapsule's LogFilterElement list to each filter if matched + */ public static void handleLogsFilter(LogsFilterCapsule logsFilterCapsule) { Iterator> it; @@ -226,8 +256,17 @@ public static void handleLogsFilter(LogsFilterCapsule logsFilterCapsule) { LogMatch.matchBlock(logFilter, logsFilterCapsule.getBlockNumber(), logsFilterCapsule.getBlockHash(), logsFilterCapsule.getTxInfoList(), logsFilterCapsule.isRemoved()); - if (CollectionUtils.isNotEmpty(elements)) { - logFilterAndResult.getResult().addAll(elements); + + for (LogFilterElement element : elements) { + LogFilterElement cachedElement; + try { + // compare with hashcode() first, then with equals(). If not exist, put it. + cachedElement = logElementCache.get(element, () -> element); + } catch (ExecutionException e) { + logger.error("Getting/loading LogFilterElement from cache fails", e); // never happen + cachedElement = element; + } + logFilterAndResult.getResult().add(cachedElement); } } } @@ -797,7 +836,7 @@ public TransactionReceipt getTransactionReceipt(String txId) long blockNum = blockCapsule.getNum(); TransactionInfoList transactionInfoList = wallet.getTransactionInfoByBlockNum(blockNum); long energyFee = wallet.getEnergyFee(blockCapsule.getTimeStamp()); - + // Find transaction context TransactionReceipt.TransactionContext context = findTransactionContext(transactionInfoList, @@ -806,7 +845,7 @@ public TransactionReceipt getTransactionReceipt(String txId) if (context == null) { return null; // Transaction not found in block } - + return new TransactionReceipt(blockCapsule, transactionInfo, context, energyFee); } @@ -1391,7 +1430,8 @@ public String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException, } @Override - public String newBlockFilter() throws JsonRpcMethodNotFoundException { + public String newBlockFilter() throws JsonRpcMethodNotFoundException, + JsonRpcExceedLimitException { disableInPBFT("eth_newBlockFilter"); Map blockFilter2Result; @@ -1400,6 +1440,10 @@ public String newBlockFilter() throws JsonRpcMethodNotFoundException { } else { blockFilter2Result = blockFilter2ResultSolidity; } + if (blockFilter2Result.size() >= maxBlockFilterNum) { + throw new JsonRpcExceedLimitException( + "exceed max block filters: " + maxBlockFilterNum + ", try again later"); + } BlockFilterAndResult filterAndResult = new BlockFilterAndResult(); String filterID = generateFilterId(); @@ -1529,6 +1573,8 @@ public static Object[] getFilterResult(String filterId, Map 0, otherwise means no limit. maxSubTopics = 1000 + # Allowed maximum number for blockFilter + maxBlockFilterNum = 50000 } # Disabled api list, it will work for http, rpc and pbft, both FullNode and SolidityNode, diff --git a/framework/src/test/java/org/tron/core/jsonrpc/LogMatchExactlyTest.java b/framework/src/test/java/org/tron/core/jsonrpc/LogMatchExactlyTest.java index f55e3bc2cfa..0f9f125b74e 100644 --- a/framework/src/test/java/org/tron/core/jsonrpc/LogMatchExactlyTest.java +++ b/framework/src/test/java/org/tron/core/jsonrpc/LogMatchExactlyTest.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import org.junit.Assert; import org.junit.Test; import org.tron.common.runtime.vm.DataWord; @@ -220,6 +221,18 @@ public void testMatchBlock() { List elementList = matchBlock(logFilter, 100, null, transactionInfoList, false); Assert.assertEquals(1, elementList.size()); + + //test LogFilterElement + List elementList2 = + matchBlock(logFilter, 100, null, transactionInfoList, false); + Assert.assertEquals(1, elementList2.size()); + + LogFilterElement logFilterElement1 = elementList.get(0); + LogFilterElement logFilterElement2 = elementList2.get(0); + + Assert.assertEquals(logFilterElement1.hashCode(), logFilterElement2.hashCode()); + Assert.assertEquals(logFilterElement1, logFilterElement2); + } catch (JsonRpcInvalidParamsException e) { Assert.fail(); } diff --git a/framework/src/test/java/org/tron/core/net/service/nodepersist/NodePersistServiceTest.java b/framework/src/test/java/org/tron/core/net/service/nodepersist/NodePersistServiceTest.java new file mode 100644 index 00000000000..cd80a6b78f0 --- /dev/null +++ b/framework/src/test/java/org/tron/core/net/service/nodepersist/NodePersistServiceTest.java @@ -0,0 +1,40 @@ +package org.tron.core.net.service.nodepersist; + +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Resource; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.tron.common.BaseTest; +import org.tron.common.utils.JsonUtil; +import org.tron.core.Constant; +import org.tron.core.capsule.BytesCapsule; +import org.tron.core.config.args.Args; + + +public class NodePersistServiceTest extends BaseTest { + + @Resource + private NodePersistService nodePersistService; + + @BeforeClass + public static void init() { + Args.setParam(new String[] {"--output-directory", dbPath(), "--debug"}, + Constant.TEST_CONF); + } + + @Test + public void testDbRead() { + byte[] DB_KEY_PEERS = "peers".getBytes(); + DBNode dbNode = new DBNode("localhost", 3306); + List dbNodeList = new ArrayList<>(); + dbNodeList.add(dbNode); + DBNodes dbNodes = new DBNodes(); + dbNodes.setNodes(dbNodeList); + chainBaseManager.getCommonStore().put(DB_KEY_PEERS, new BytesCapsule( + JsonUtil.obj2Json(dbNodes).getBytes())); + + Assert.assertEquals(1, nodePersistService.dbRead().size()); + } +} diff --git a/framework/src/test/java/org/tron/core/zksnark/ShieldedReceiveTest.java b/framework/src/test/java/org/tron/core/zksnark/ShieldedReceiveTest.java index 013d58b63ca..bba23a230fc 100755 --- a/framework/src/test/java/org/tron/core/zksnark/ShieldedReceiveTest.java +++ b/framework/src/test/java/org/tron/core/zksnark/ShieldedReceiveTest.java @@ -228,6 +228,11 @@ private void updateTotalShieldedPoolValue(long valueBalance) { chainBaseManager.getDynamicPropertiesStore().saveTotalShieldedPoolValue(totalShieldedPoolValue); } + @Test + public void testIsMining() { + Assert.assertTrue(wallet.isMining()); + } + /* * test of change ShieldedTransactionFee proposal */ diff --git a/framework/src/test/resources/config-localtest.conf b/framework/src/test/resources/config-localtest.conf index 50a3f97d8b7..8049ceb6cda 100644 --- a/framework/src/test/resources/config-localtest.conf +++ b/framework/src/test/resources/config-localtest.conf @@ -167,6 +167,7 @@ node { # httpPBFTPort = 8565 # maxBlockRange = 5000 # maxSubTopics = 1000 + # maxBlockFilterNum = 30000 } } diff --git a/framework/src/test/resources/config-test-index.conf b/framework/src/test/resources/config-test-index.conf index faa2f93dc5e..3ea6b50b20c 100644 --- a/framework/src/test/resources/config-test-index.conf +++ b/framework/src/test/resources/config-test-index.conf @@ -88,6 +88,7 @@ node { httpPBFTEnable = false # maxBlockRange = 5000 # maxSubTopics = 1000 + # maxBlockFilterNum = 30000 } rpc { diff --git a/framework/src/test/resources/config-test-mainnet.conf b/framework/src/test/resources/config-test-mainnet.conf index 12acad64d8d..123c8e5d368 100644 --- a/framework/src/test/resources/config-test-mainnet.conf +++ b/framework/src/test/resources/config-test-mainnet.conf @@ -94,6 +94,7 @@ node { httpPBFTEnable = false # maxBlockRange = 5000 # maxSubTopics = 1000 + # maxBlockFilterNum = 50000 } rpc { diff --git a/framework/src/test/resources/config-test.conf b/framework/src/test/resources/config-test.conf index 673d4932601..eb4f605ab91 100644 --- a/framework/src/test/resources/config-test.conf +++ b/framework/src/test/resources/config-test.conf @@ -118,6 +118,7 @@ node { httpPBFTEnable = false # maxBlockRange = 5000 # maxSubTopics = 1000 + # maxBlockFilterNum = 30000 } # use your ipv6 address for node discovery and tcp connection, default false