diff --git a/bin/reth/tests/commands/bitfinity_node_it.rs b/bin/reth/tests/commands/bitfinity_node_it.rs index 62eb65c6a3d..df8c40d665a 100644 --- a/bin/reth/tests/commands/bitfinity_node_it.rs +++ b/bin/reth/tests/commands/bitfinity_node_it.rs @@ -49,6 +49,85 @@ async fn bitfinity_test_should_start_local_reth_node() { assert!(reth_client.get_chain_id().await.is_ok()); } +#[tokio::test] +async fn bitfinity_test_lb_lag_check() { + // Arrange + let _log = init_logs(); + + let eth_server = EthImpl::new(); + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let (reth_client, _reth_node) = + start_reth_node(Some(format!("http://{}", eth_server_address)), None).await; + + // Try `eth_lbLagCheck` + let result: String = reth_client + .single_request( + "eth_lbLagCheck".to_owned(), + ethereum_json_rpc_client::Params::Array(vec![10.into()]), + ethereum_json_rpc_client::Id::Num(1), + ) + .await + .unwrap(); + + assert!(result.contains("ACCEPTABLE_LAG"), "{result:?}"); + + // Need time to generate extra blocks at `eth_server` + // Assuming `EthImpl` ticks 100ms for the each next block + let mut lag_check_ok = false; + let mut interval = tokio::time::interval(std::time::Duration::from_millis(50)); + for _ in 0..100 { + interval.tick().await; + + let result = reth_client + .single_request::( + "eth_lbLagCheck".to_owned(), + ethereum_json_rpc_client::Params::Array(vec![5.into()]), + ethereum_json_rpc_client::Id::Num(1), + ) + .await; + if let Ok(message) = result { + if message.contains("LAGGING") { + lag_check_ok = true; + break; + } + } + } + + assert!(lag_check_ok); + + // And should not lag with bigger acceptable delta + let result: String = reth_client + .single_request( + "eth_lbLagCheck".to_owned(), + ethereum_json_rpc_client::Params::Array(vec![1000.into()]), + ethereum_json_rpc_client::Id::Num(1), + ) + .await + .unwrap(); + + assert!(result.contains("ACCEPTABLE_LAG"), "{result:?}"); +} + +#[tokio::test] +async fn bitfinity_test_lb_lag_check_fail_safe() { + let (reth_client, _reth_node) = + start_reth_node(Some("http://local_host:11".to_string()), None).await; + + let message: String = reth_client + .single_request( + "eth_lbLagCheck".to_owned(), + ethereum_json_rpc_client::Params::Array(vec![1000.into()]), + ethereum_json_rpc_client::Id::Num(1), + ) + .await + .unwrap(); + + // Response should be OK to do not break LB if source temporary not available + assert!(message.contains("ACCEPTABLE_LAG"), "{message}"); + assert!(message.contains("NO_SOURCE"), "{message}"); +} + #[tokio::test] async fn bitfinity_test_node_forward_ic_or_eth_get_last_certified_block() { // Arrange diff --git a/crates/rpc/rpc-eth-api/src/core.rs b/crates/rpc/rpc-eth-api/src/core.rs index 2120db192ef..aa4f174f29c 100644 --- a/crates/rpc/rpc-eth-api/src/core.rs +++ b/crates/rpc/rpc-eth-api/src/core.rs @@ -51,6 +51,11 @@ impl FullEthApiServer for T where #[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))] #[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))] pub trait EthApi { + /// Bitfinity LB api extension + /// Handler for: `eth_lbLagCheck` + #[method(name = "lbLagCheck")] + async fn lb_lag_check(&self, accepted_lag: Option) -> RpcResult; + /// Returns the protocol version encoded as a string. #[method(name = "protocolVersion")] async fn protocol_version(&self) -> RpcResult; @@ -386,6 +391,38 @@ where T: FullEthApi, jsonrpsee_types::error::ErrorObject<'static>: From, { + /// Handler for: `eth_lbLagCheck` + async fn lb_lag_check(&self, accepted_lag: Option) -> RpcResult { + const LAG_STATUS_BAD: &str = "LAGGING"; + const LAG_STATUS_OK: &str = "ACCEPTABLE_LAG"; + let network_block = match BitfinityEvmRpc::network_block_number(self).await { + Ok(block) => block, + Err(e) => { + // Must not fail if rpc-url/evmc is not responding + // or it could've disable all nodes under LB + tracing::error!(target: "rpc::eth", "Failed to get block number from the network. {}", e); + return Ok(format!("{}: NO_SOURCE", LAG_STATUS_OK)); + } + }; + + let accepted_lag = match accepted_lag { + Some(lag) => U256::from(lag), + // Assuming that lag behind for 3 block is ok + None => U256::from(3), + }; + + let node_block = self.block_number()?; + let lag = network_block.saturating_sub(node_block); + + let status = if lag > accepted_lag { LAG_STATUS_BAD } else { LAG_STATUS_OK }; + + // Better to respond with string that add structure, import serde and stuff + let response = + format!("{}: lag: {} node: {} network: {}", status, lag, node_block, network_block); + + Ok(response) + } + /// Handler for: `eth_protocolVersion` async fn protocol_version(&self) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_protocolVersion"); diff --git a/crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs b/crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs index dd18583d5bf..123ec13ccd8 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs @@ -16,6 +16,25 @@ pub trait BitfinityEvmRpc { /// Returns the `ChainSpec`. fn chain_spec(&self) -> Arc; + /// Returns latest block number at the network/sync source. + fn network_block_number(&self) -> impl Future> + Send { + let chain_spec = self.chain_spec(); + async move { + // TODO: Expecting that client node would be the active data sorce at this time + // it could be primary or backup URL + let (rpc_url, client) = get_client(&chain_spec)?; + + let block_number = client.get_block_number().await.map_err(|e| { + internal_rpc_err(format!( + "failed to forward eth_blockNumber request to {}: {}", + rpc_url, e + )) + })?; + + Ok(U256::from(block_number)) + } + } + /// Forwards `eth_gasPrice` calls to the Bitfinity EVM. fn gas_price(&self) -> impl Future> + Send { let chain_spec = self.chain_spec();