Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions bin/reth/tests/commands/bitfinity_node_it.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,85 @@ async fn bitfinity_test_should_start_local_reth_node() {
assert!(reth_client.get_chain_id().await.is_ok());
}

#[tokio::test]
async fn bitfinity_test_lb_lag_check() {
// Arrange
let _log = init_logs();

let eth_server = EthImpl::new();
let (_server, eth_server_address) =
mock_eth_server_start(EthServer::into_rpc(eth_server)).await;
let (reth_client, _reth_node) =
start_reth_node(Some(format!("http://{}", eth_server_address)), None).await;

// Try `eth_lbLagCheck`
let result: String = reth_client
.single_request(
"eth_lbLagCheck".to_owned(),
ethereum_json_rpc_client::Params::Array(vec![10.into()]),
ethereum_json_rpc_client::Id::Num(1),
)
.await
.unwrap();

assert!(result.contains("ACCEPTABLE_LAG"), "{result:?}");

// Need time to generate extra blocks at `eth_server`
// Assuming `EthImpl` ticks 100ms for the each next block
let mut lag_check_ok = false;
let mut interval = tokio::time::interval(std::time::Duration::from_millis(50));
for _ in 0..100 {
interval.tick().await;

let result = reth_client
.single_request::<String>(
"eth_lbLagCheck".to_owned(),
ethereum_json_rpc_client::Params::Array(vec![5.into()]),
ethereum_json_rpc_client::Id::Num(1),
)
.await;
if let Ok(message) = result {
if message.contains("LAGGING") {
lag_check_ok = true;
break;
}
}
}

assert!(lag_check_ok);

// And should not lag with bigger acceptable delta
let result: String = reth_client
.single_request(
"eth_lbLagCheck".to_owned(),
ethereum_json_rpc_client::Params::Array(vec![1000.into()]),
ethereum_json_rpc_client::Id::Num(1),
)
.await
.unwrap();

assert!(result.contains("ACCEPTABLE_LAG"), "{result:?}");
}

#[tokio::test]
async fn bitfinity_test_lb_lag_check_fail_safe() {
let (reth_client, _reth_node) =
start_reth_node(Some("http://local_host:11".to_string()), None).await;

let message: String = reth_client
.single_request(
"eth_lbLagCheck".to_owned(),
ethereum_json_rpc_client::Params::Array(vec![1000.into()]),
ethereum_json_rpc_client::Id::Num(1),
)
.await
.unwrap();

// Response should be OK to do not break LB if source temporary not available
assert!(message.contains("ACCEPTABLE_LAG"), "{message}");
assert!(message.contains("NO_SOURCE"), "{message}");
}

#[tokio::test]
async fn bitfinity_test_node_forward_ic_or_eth_get_last_certified_block() {
// Arrange
Expand Down
37 changes: 37 additions & 0 deletions crates/rpc/rpc-eth-api/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ impl<T> FullEthApiServer for T where
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))]
pub trait EthApi<T: RpcObject, B: RpcObject, R: RpcObject, H: RpcObject> {
/// Bitfinity LB api extension
/// Handler for: `eth_lbLagCheck`
#[method(name = "lbLagCheck")]
async fn lb_lag_check(&self, accepted_lag: Option<U64>) -> RpcResult<String>;

/// Returns the protocol version encoded as a string.
#[method(name = "protocolVersion")]
async fn protocol_version(&self) -> RpcResult<U64>;
Expand Down Expand Up @@ -386,6 +391,38 @@ where
T: FullEthApi,
jsonrpsee_types::error::ErrorObject<'static>: From<T::Error>,
{
/// Handler for: `eth_lbLagCheck`
async fn lb_lag_check(&self, accepted_lag: Option<U64>) -> RpcResult<String> {
const LAG_STATUS_BAD: &str = "LAGGING";
const LAG_STATUS_OK: &str = "ACCEPTABLE_LAG";
let network_block = match BitfinityEvmRpc::network_block_number(self).await {
Ok(block) => block,
Err(e) => {
// Must not fail if rpc-url/evmc is not responding
// or it could've disable all nodes under LB
tracing::error!(target: "rpc::eth", "Failed to get block number from the network. {}", e);
return Ok(format!("{}: NO_SOURCE", LAG_STATUS_OK));
}
};

let accepted_lag = match accepted_lag {
Some(lag) => U256::from(lag),
// Assuming that lag behind for 3 block is ok
None => U256::from(3),
};

let node_block = self.block_number()?;
let lag = network_block.saturating_sub(node_block);

let status = if lag > accepted_lag { LAG_STATUS_BAD } else { LAG_STATUS_OK };

// Better to respond with string that add structure, import serde and stuff
let response =
format!("{}: lag: {} node: {} network: {}", status, lag, node_block, network_block);

Ok(response)
}

/// Handler for: `eth_protocolVersion`
async fn protocol_version(&self) -> RpcResult<U64> {
trace!(target: "rpc::eth", "Serving eth_protocolVersion");
Expand Down
19 changes: 19 additions & 0 deletions crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@
/// Returns the `ChainSpec`.
fn chain_spec(&self) -> Arc<ChainSpec>;

/// Returns latest block number at the network/sync source.
fn network_block_number(&self) -> impl Future<Output = RpcResult<U256>> + Send {
let chain_spec = self.chain_spec();
async move {
// TODO: Expecting that client node would be the active data sorce at this time

Check failure on line 23 in crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs

View workflow job for this annotation

GitHub Actions / codespell

sorce ==> source, force, sore
// it could be primary or backup URL
let (rpc_url, client) = get_client(&chain_spec)?;

let block_number = client.get_block_number().await.map_err(|e| {
internal_rpc_err(format!(
"failed to forward eth_blockNumber request to {}: {}",
rpc_url, e
))
})?;

Ok(U256::from(block_number))
}
}

/// Forwards `eth_gasPrice` calls to the Bitfinity EVM.
fn gas_price(&self) -> impl Future<Output = RpcResult<U256>> + Send {
let chain_spec = self.chain_spec();
Expand Down
Loading