Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ public List<String> getCheckpointList() {

private void deleteCheckpoint() {
if(checkTmpStore == null) {
// only occurs in mock test. TODO fix test
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ public class CommonParameter {
@Getter
@Setter
public int jsonRpcMaxSubTopics = 1000;
@Getter
@Setter
public int jsonRpcMaxBlockFilterNum = 50000;

@Getter
@Setter
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public class Constant {
public static final String NODE_JSONRPC_HTTP_PBFT_PORT = "node.jsonrpc.httpPBFTPort";
public static final String NODE_JSONRPC_MAX_BLOCK_RANGE = "node.jsonrpc.maxBlockRange";
public static final String NODE_JSONRPC_MAX_SUB_TOPICS = "node.jsonrpc.maxSubTopics";
public static final String NODE_JSONRPC_MAX_BLOCK_FILTER_NUM = "node.jsonrpc.maxBlockFilterNum";

public static final String NODE_DISABLED_API_LIST = "node.disabledApi";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.tron.core.exception.jsonrpc;

public class JsonRpcExceedLimitException extends JsonRpcException {

public JsonRpcExceedLimitException() {
super();
}

public JsonRpcExceedLimitException(String message) {
super(message);
}

public JsonRpcExceedLimitException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ public boolean isBusy() {
}
int queueSize = 0;
if (eventListeners == null || eventListeners.isEmpty()) {
// only occurs in mock test. TODO fix test
return false;
}
for (IPluginEventListener listener : eventListeners) {
Expand Down
6 changes: 6 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public static void clearParam() {
PARAMETER.jsonRpcHttpPBFTNodeEnable = false;
PARAMETER.jsonRpcMaxBlockRange = 5000;
PARAMETER.jsonRpcMaxSubTopics = 1000;
PARAMETER.jsonRpcMaxBlockFilterNum = 50000;
PARAMETER.nodeMetricsEnable = false;
PARAMETER.metricsStorageEnable = false;
PARAMETER.metricsPrometheusEnable = false;
Expand Down Expand Up @@ -491,6 +492,11 @@ public static void setParam(final Config config) {
config.getInt(Constant.NODE_JSONRPC_MAX_SUB_TOPICS);
}

if (config.hasPath(Constant.NODE_JSONRPC_MAX_BLOCK_FILTER_NUM)) {
PARAMETER.jsonRpcMaxBlockFilterNum =
config.getInt(Constant.NODE_JSONRPC_MAX_BLOCK_FILTER_NUM);
}

if (config.hasPath(Constant.VM_MIN_TIME_RATIO)) {
PARAMETER.minTimeRatio = config.getDouble(Constant.VM_MIN_TIME_RATIO);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void statusCheck() {
long now = System.currentTimeMillis();

if (tronNetDelegate == null) {
//only occurs in mock test. TODO fix test
// only occurs in mock test. TODO fix test
return;
}
tronNetDelegate.getActivePeer().forEach(peer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.googlecode.jsonrpc4j.JsonRpcMethod;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -18,6 +19,7 @@
import org.tron.common.utils.ByteArray;
import org.tron.core.exception.BadItemException;
import org.tron.core.exception.ItemNotFoundException;
import org.tron.core.exception.jsonrpc.JsonRpcExceedLimitException;
import org.tron.core.exception.jsonrpc.JsonRpcInternalException;
import org.tron.core.exception.jsonrpc.JsonRpcInvalidParamsException;
import org.tron.core.exception.jsonrpc.JsonRpcInvalidRequestException;
Expand All @@ -29,6 +31,9 @@
import org.tron.core.services.jsonrpc.types.TransactionReceipt;
import org.tron.core.services.jsonrpc.types.TransactionResult;

/**
* Error code refers to https://www.quicknode.com/docs/ethereum/error-references
*/
@Component
public interface TronJsonRpc {

Expand Down Expand Up @@ -292,9 +297,10 @@ String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException,

@JsonRpcMethod("eth_newBlockFilter")
@JsonRpcErrors({
@JsonRpcError(exception = JsonRpcExceedLimitException.class, code = -32005, data = "{}"),
@JsonRpcError(exception = JsonRpcMethodNotFoundException.class, code = -32601, data = "{}"),
})
String newBlockFilter() throws JsonRpcMethodNotFoundException;
String newBlockFilter() throws JsonRpcExceedLimitException, JsonRpcMethodNotFoundException;

@JsonRpcMethod("eth_uninstallFilter")
@JsonRpcErrors({
Expand Down Expand Up @@ -472,5 +478,35 @@ public LogFilterElement(String blockHash, Long blockNum, String txId, Integer tx
}
this.removed = removed;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || this.getClass() != o.getClass()) {
return false;
}
LogFilterElement item = (LogFilterElement) o;
if (!Objects.equals(blockHash, item.blockHash)) {
return false;
}
if (!Objects.equals(transactionHash, item.transactionHash)) {
return false;
}
if (!Objects.equals(transactionIndex, item.transactionIndex)) {
return false;
}
if (!Objects.equals(logIndex, item.logIndex)) {
return false;
}
return removed == item.removed;
}

@Override
public int hashCode() {
return Objects.hash(blockHash, transactionHash, transactionIndex, logIndex, removed);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import static org.tron.core.services.jsonrpc.JsonRpcApiUtil.triggerCallContract;

import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
import java.io.Closeable;
Expand All @@ -25,10 +27,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.bouncycastle.util.encoders.Hex;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -51,6 +53,7 @@
import org.tron.core.Wallet;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.capsule.TransactionCapsule;
import org.tron.core.config.args.Args;
import org.tron.core.db.Manager;
import org.tron.core.db2.core.Chainbase;
import org.tron.core.exception.BadItemException;
Expand All @@ -59,6 +62,7 @@
import org.tron.core.exception.HeaderNotFound;
import org.tron.core.exception.ItemNotFoundException;
import org.tron.core.exception.VMIllegalException;
import org.tron.core.exception.jsonrpc.JsonRpcExceedLimitException;
import org.tron.core.exception.jsonrpc.JsonRpcInternalException;
import org.tron.core.exception.jsonrpc.JsonRpcInvalidParamsException;
import org.tron.core.exception.jsonrpc.JsonRpcInvalidRequestException;
Expand Down Expand Up @@ -110,6 +114,17 @@ public enum RequestSource {

private static final String FILTER_NOT_FOUND = "filter not found";
public static final int EXPIRE_SECONDS = 5 * 60;
private static final int maxBlockFilterNum = Args.getInstance().getJsonRpcMaxBlockFilterNum();
private static final Cache<LogFilterElement, LogFilterElement> logElementCache =
CacheBuilder.newBuilder()
.maximumSize(300_000L) // 300s * tps(1000) * 1 log/tx ≈ 300_000
.expireAfterWrite(EXPIRE_SECONDS, TimeUnit.SECONDS)
.recordStats().build(); //LRU cache
private static final Cache<String, String> blockHashCache =
CacheBuilder.newBuilder()
.maximumSize(60_000L) // 300s * 200 block/s when syncing
.expireAfterWrite(EXPIRE_SECONDS, TimeUnit.SECONDS)
.recordStats().build(); //LRU cache
/**
* for log filter in Full Json-RPC
*/
Expand Down Expand Up @@ -181,16 +196,31 @@ public static void handleBLockFilter(BlockFilterCapsule blockFilterCapsule) {
it = getBlockFilter2ResultFull().entrySet().iterator();
}

if (!it.hasNext()) {
return;
}
final String originalBlockHash = ByteArray.toJsonHex(blockFilterCapsule.getBlockHash());
String cachedBlockHash;
try {
// compare with hashcode() first, then with equals(). If not exist, put it.
cachedBlockHash = blockHashCache.get(originalBlockHash, () -> originalBlockHash);
} catch (ExecutionException e) {
logger.error("Getting/loading blockHash from cache failed", e); // never happen
cachedBlockHash = originalBlockHash;
}
while (it.hasNext()) {
Entry<String, BlockFilterAndResult> entry = it.next();
if (entry.getValue().isExpire()) {
it.remove();
continue;
}
entry.getValue().getResult().add(ByteArray.toJsonHex(blockFilterCapsule.getBlockHash()));
entry.getValue().getResult().add(cachedBlockHash);
}
}

/**
* append LogsFilterCapsule's LogFilterElement list to each filter if matched
*/
public static void handleLogsFilter(LogsFilterCapsule logsFilterCapsule) {
Iterator<Entry<String, LogFilterAndResult>> it;

Expand Down Expand Up @@ -226,8 +256,17 @@ public static void handleLogsFilter(LogsFilterCapsule logsFilterCapsule) {
LogMatch.matchBlock(logFilter, logsFilterCapsule.getBlockNumber(),
logsFilterCapsule.getBlockHash(), logsFilterCapsule.getTxInfoList(),
logsFilterCapsule.isRemoved());
if (CollectionUtils.isNotEmpty(elements)) {
logFilterAndResult.getResult().addAll(elements);

for (LogFilterElement element : elements) {
LogFilterElement cachedElement;
try {
// compare with hashcode() first, then with equals(). If not exist, put it.
cachedElement = logElementCache.get(element, () -> element);
} catch (ExecutionException e) {
logger.error("Getting/loading LogFilterElement from cache fails", e); // never happen
cachedElement = element;
}
logFilterAndResult.getResult().add(cachedElement);
}
}
}
Expand Down Expand Up @@ -797,7 +836,7 @@ public TransactionReceipt getTransactionReceipt(String txId)
long blockNum = blockCapsule.getNum();
TransactionInfoList transactionInfoList = wallet.getTransactionInfoByBlockNum(blockNum);
long energyFee = wallet.getEnergyFee(blockCapsule.getTimeStamp());

// Find transaction context
TransactionReceipt.TransactionContext context
= findTransactionContext(transactionInfoList,
Expand All @@ -806,7 +845,7 @@ public TransactionReceipt getTransactionReceipt(String txId)
if (context == null) {
return null; // Transaction not found in block
}

return new TransactionReceipt(blockCapsule, transactionInfo, context, energyFee);
}

Expand Down Expand Up @@ -1391,7 +1430,8 @@ public String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException,
}

@Override
public String newBlockFilter() throws JsonRpcMethodNotFoundException {
public String newBlockFilter() throws JsonRpcMethodNotFoundException,
JsonRpcExceedLimitException {
disableInPBFT("eth_newBlockFilter");

Map<String, BlockFilterAndResult> blockFilter2Result;
Expand All @@ -1400,6 +1440,10 @@ public String newBlockFilter() throws JsonRpcMethodNotFoundException {
} else {
blockFilter2Result = blockFilter2ResultSolidity;
}
if (blockFilter2Result.size() >= maxBlockFilterNum) {
throw new JsonRpcExceedLimitException(
"exceed max block filters: " + maxBlockFilterNum + ", try again later");
}

BlockFilterAndResult filterAndResult = new BlockFilterAndResult();
String filterID = generateFilterId();
Expand Down Expand Up @@ -1529,6 +1573,8 @@ public static Object[] getFilterResult(String filterId, Map<String, BlockFilterA

@Override
public void close() throws IOException {
logElementCache.invalidateAll();
blockHashCache.invalidateAll();
ExecutorServiceManager.shutdownAndAwaitTermination(sectionExecutor, esName);
}

Expand Down
1 change: 1 addition & 0 deletions framework/src/main/resources/config-localtest.conf
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ node {
# httpPBFTPort = 8565
# maxBlockRange = 5000
# maxSubTopics = 1000
# maxBlockFilterNum = 30000
}

}
Expand Down
2 changes: 2 additions & 0 deletions framework/src/main/resources/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ node {
# The maximum number of allowed topics within a topic criteria, default value is 1000,
# should be > 0, otherwise means no limit.
maxSubTopics = 1000
# Allowed maximum number for blockFilter
maxBlockFilterNum = 50000
}

# Disabled api list, it will work for http, rpc and pbft, both FullNode and SolidityNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.junit.Assert;
import org.junit.Test;
import org.tron.common.runtime.vm.DataWord;
Expand Down Expand Up @@ -220,6 +221,18 @@ public void testMatchBlock() {
List<LogFilterElement> elementList =
matchBlock(logFilter, 100, null, transactionInfoList, false);
Assert.assertEquals(1, elementList.size());

//test LogFilterElement
List<LogFilterElement> elementList2 =
matchBlock(logFilter, 100, null, transactionInfoList, false);
Assert.assertEquals(1, elementList2.size());

LogFilterElement logFilterElement1 = elementList.get(0);
LogFilterElement logFilterElement2 = elementList2.get(0);

Assert.assertEquals(logFilterElement1.hashCode(), logFilterElement2.hashCode());
Assert.assertEquals(logFilterElement1, logFilterElement2);

} catch (JsonRpcInvalidParamsException e) {
Assert.fail();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.tron.core.net.service.nodepersist;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.tron.common.BaseTest;
import org.tron.common.utils.JsonUtil;
import org.tron.core.Constant;
import org.tron.core.capsule.BytesCapsule;
import org.tron.core.config.args.Args;


public class NodePersistServiceTest extends BaseTest {

@Resource
private NodePersistService nodePersistService;

@BeforeClass
public static void init() {
Args.setParam(new String[] {"--output-directory", dbPath(), "--debug"},
Constant.TEST_CONF);
}

@Test
public void testDbRead() {
byte[] DB_KEY_PEERS = "peers".getBytes();
DBNode dbNode = new DBNode("localhost", 3306);
List<DBNode> dbNodeList = new ArrayList<>();
dbNodeList.add(dbNode);
DBNodes dbNodes = new DBNodes();
dbNodes.setNodes(dbNodeList);
chainBaseManager.getCommonStore().put(DB_KEY_PEERS, new BytesCapsule(
JsonUtil.obj2Json(dbNodes).getBytes()));

Assert.assertEquals(1, nodePersistService.dbRead().size());
}
}
Loading