diff --git a/.vscode/settings.json b/.vscode/settings.json index a47cdf9..afa3b89 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "rust-analyzer.rustfmt.extraArgs": ["+nightly"] + "rust-analyzer.rustfmt.extraArgs": ["+nightly"], + "rust-analyzer.cargo.features": ["http-subscription"] } diff --git a/Cargo.lock b/Cargo.lock index 1c6d6fe..baca267 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2902,6 +2902,7 @@ dependencies = [ "alloy", "anyhow", "backon", + "futures-util", "serde_json", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 73177be..e046fe2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ backon = "1.6.0" tokio-stream = "0.1.17" thiserror = "2.0.17" tokio-util = "0.7.17" +futures-util = "0.3" tracing = { version = "0.1", optional = true } serde_json = "1.0.149" @@ -40,6 +41,7 @@ all-features = true [features] tracing = ["dep:tracing"] +http-subscription = [] [profile.release] lto = "thin" diff --git a/src/lib.rs b/src/lib.rs index 0daa9ca..e4d4866 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,3 +70,8 @@ pub use robust_provider::{ Error, IntoRobustProvider, IntoRootProvider, RobustProvider, RobustProviderBuilder, RobustSubscription, RobustSubscriptionStream, SubscriptionError, }; + +#[cfg(feature = "http-subscription")] +pub use robust_provider::{ + DEFAULT_POLL_INTERVAL, HttpPollingSubscription, HttpSubscriptionConfig, HttpSubscriptionError, +}; diff --git a/src/robust_provider/builder.rs b/src/robust_provider/builder.rs index 4fe4db5..4944ece 100644 --- a/src/robust_provider/builder.rs +++ b/src/robust_provider/builder.rs @@ -6,6 +6,9 @@ use crate::robust_provider::{ Error, IntoRootProvider, RobustProvider, subscription::DEFAULT_RECONNECT_INTERVAL, }; +#[cfg(feature = "http-subscription")] +use crate::robust_provider::http_subscription::DEFAULT_POLL_INTERVAL; + type BoxedProviderFuture = Pin, Error>> + Send>>; // RPC retry and timeout settings @@ -32,6 +35,10 @@ pub struct RobustProviderBuilder> { min_delay: Duration, reconnect_interval: Duration, subscription_buffer_capacity: usize, + #[cfg(feature = "http-subscription")] + poll_interval: Duration, + #[cfg(feature = "http-subscription")] + allow_http_subscriptions: bool, } impl> RobustProviderBuilder { @@ -50,6 +57,10 @@ impl> RobustProviderBuilder { min_delay: DEFAULT_MIN_DELAY, reconnect_interval: DEFAULT_RECONNECT_INTERVAL, subscription_buffer_capacity: DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, + #[cfg(feature = "http-subscription")] + poll_interval: DEFAULT_POLL_INTERVAL, + #[cfg(feature = "http-subscription")] + allow_http_subscriptions: false, } } @@ -127,6 +138,56 @@ impl> RobustProviderBuilder { self } + /// Set the polling interval for HTTP-based subscriptions. + /// + /// This controls how frequently HTTP providers poll for new blocks + /// when used as subscription sources. Only relevant when + /// [`allow_http_subscriptions`](Self::allow_http_subscriptions) is enabled. + /// + /// Default is 12 seconds (approximate Ethereum mainnet block time). + /// Adjust based on your target chain's block time. + /// + /// # Feature Flag + /// + /// This method requires the `http-subscription` feature. + #[cfg(feature = "http-subscription")] + #[must_use] + pub fn poll_interval(mut self, interval: Duration) -> Self { + self.poll_interval = interval; + self + } + + /// Enable HTTP providers for subscriptions via polling. + /// + /// When enabled, HTTP providers can participate in subscriptions + /// by polling for new blocks at the configured [`poll_interval`](Self::poll_interval). + /// + /// # Trade-offs + /// + /// * **Latency**: New blocks detected with up to `poll_interval` delay + /// * **RPC Load**: Generates one RPC call per `poll_interval` + /// * **Missed Blocks**: If `poll_interval` > block time, intermediate blocks may be missed + /// + /// # Feature Flag + /// + /// This method requires the `http-subscription` feature. + /// + /// # Example + /// + /// ```rust,ignore + /// let robust = RobustProviderBuilder::new(http_provider) + /// .allow_http_subscriptions(true) + /// .poll_interval(Duration::from_secs(6)) // For faster chains + /// .build() + /// .await?; + /// ``` + #[cfg(feature = "http-subscription")] + #[must_use] + pub fn allow_http_subscriptions(mut self, allow: bool) -> Self { + self.allow_http_subscriptions = allow; + self + } + /// Build the `RobustProvider`. /// /// Final builder method: consumes the builder and returns the built [`RobustProvider`]. @@ -165,6 +226,10 @@ impl> RobustProviderBuilder { min_delay: self.min_delay, reconnect_interval: self.reconnect_interval, subscription_buffer_capacity: self.subscription_buffer_capacity, + #[cfg(feature = "http-subscription")] + poll_interval: self.poll_interval, + #[cfg(feature = "http-subscription")] + allow_http_subscriptions: self.allow_http_subscriptions, }) } } diff --git a/src/robust_provider/errors.rs b/src/robust_provider/errors.rs index e864e94..57dff92 100644 --- a/src/robust_provider/errors.rs +++ b/src/robust_provider/errors.rs @@ -102,9 +102,9 @@ impl From for Error { fn from(err: subscription::Error) -> Self { match err { subscription::Error::RpcError(e) => Error::RpcError(e), - subscription::Error::Timeout | - subscription::Error::Closed | - subscription::Error::Lagged(_) => Error::Timeout, + subscription::Error::Timeout + | subscription::Error::Closed + | subscription::Error::Lagged(_) => Error::Timeout, } } } @@ -120,9 +120,9 @@ pub(crate) fn is_retryable_error(code: i64, message: &str) -> bool { } pub(crate) fn is_block_not_found(code: i64, message: &str) -> bool { - geth::is_block_not_found(code, message) || - besu::is_block_not_found(code, message) || - anvil::is_block_not_found(code, message) + geth::is_block_not_found(code, message) + || besu::is_block_not_found(code, message) + || anvil::is_block_not_found(code, message) } pub(crate) fn is_invalid_log_filter(code: i64, message: &str) -> bool { @@ -173,14 +173,14 @@ mod geth { ( DEFAULT_ERROR_CODE, // https://github.com/ethereum/go-ethereum/blob/ef815c59a207d50668afb343811ed7ff02cc640b/eth/filters/api.go#L39-L46 - "invalid block range params" | - "block range extends beyond current head block" | - "can't specify fromBlock/toBlock with blockHash" | - "pending logs are not supported" | - "unknown block" | - "exceed max topics" | - "exceed max addresses or topics per search position" | - "filter not found" + "invalid block range params" + | "block range extends beyond current head block" + | "can't specify fromBlock/toBlock with blockHash" + | "pending logs are not supported" + | "unknown block" + | "exceed max topics" + | "exceed max addresses or topics per search position" + | "filter not found" ) ) } diff --git a/src/robust_provider/http_subscription.rs b/src/robust_provider/http_subscription.rs new file mode 100644 index 0000000..d609dfb --- /dev/null +++ b/src/robust_provider/http_subscription.rs @@ -0,0 +1,343 @@ +//! HTTP-based polling subscription for providers without pubsub support. +//! +//! This module provides a polling-based alternative to WebSocket subscriptions, +//! allowing HTTP providers to participate in block subscriptions by periodically +//! polling for new blocks. +//! +//! # Feature Flag +//! +//! This module requires the `http-subscription` feature: +//! +//! ```toml +//! robust-provider = { version = "0.2", features = ["http-subscription"] } +//! ``` +//! +//! # Example +//! +//! ```rust,no_run +//! use alloy::providers::ProviderBuilder; +//! use robust_provider::RobustProviderBuilder; +//! use std::time::Duration; +//! +//! # async fn example() -> anyhow::Result<()> { +//! let http = ProviderBuilder::new().connect_http("http://localhost:8545")?; +//! +//! let robust = RobustProviderBuilder::new(http) +//! .allow_http_subscriptions(true) +//! .poll_interval(Duration::from_secs(12)) +//! .build() +//! .await?; +//! +//! let mut subscription = robust.subscribe_blocks().await?; +//! while let Ok(block) = subscription.recv().await { +//! println!("New block: {}", block.number); +//! } +//! # Ok(()) } +//! ``` + +use std::{sync::Arc, time::Duration}; + +use crate::RobustProvider; +use alloy::{ + network::{BlockResponse, Network}, + primitives::BlockHash, + providers::Provider, + transports::{RpcError, TransportErrorKind}, +}; +use futures_util::{StreamExt, stream}; +use tokio::sync::mpsc; + +/// Default polling interval for HTTP subscriptions. +/// +/// Set to 12 seconds to match approximate Ethereum mainnet block time. +/// Adjust based on the target chain's block time. +pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(12); + +/// Default timeout for individual RPC calls during HTTP polling. +pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30); + +/// Default buffer capacity for the internal subscription channel. +pub const DEFAULT_BUFFER_CAPACITY: usize = 128; + +/// Errors specific to HTTP polling subscriptions. +#[derive(Debug, Clone, thiserror::Error)] +pub enum HttpSubscriptionError { + /// Polling operation exceeded the configured timeout. + #[error("Polling operation timed out")] + Timeout, + + /// An RPC error occurred during polling. + #[error("RPC error during polling: {0}")] + RpcError(Arc>), + + /// The subscription channel was closed. + #[error("Subscription channel closed")] + Closed, + + /// Failed to fetch block from the provider. + #[error("Block fetch failed: {0}")] + BlockFetchFailed(String), +} + +impl From> for HttpSubscriptionError { + fn from(err: RpcError) -> Self { + HttpSubscriptionError::RpcError(Arc::new(err)) + } +} + +/// Configuration for HTTP polling subscriptions. +#[derive(Debug, Clone)] +pub struct HttpSubscriptionConfig { + /// Interval between polling requests. + /// + /// Default: [`DEFAULT_POLL_INTERVAL`] (12 seconds) + pub poll_interval: Duration, + + /// Timeout for individual RPC calls. + /// + /// Default: [`DEFAULT_CALL_TIMEOUT`] (30 seconds) + pub call_timeout: Duration, + + /// Buffer size for the internal channel. + /// + /// Default: [`DEFAULT_BUFFER_CAPACITY`] (128) + pub buffer_capacity: usize, +} + +impl Default for HttpSubscriptionConfig { + fn default() -> Self { + Self { + poll_interval: DEFAULT_POLL_INTERVAL, + call_timeout: DEFAULT_CALL_TIMEOUT, + buffer_capacity: DEFAULT_BUFFER_CAPACITY, + } + } +} + +/// HTTP-based polling subscription that emulates WebSocket subscriptions +/// by polling for new blocks at regular intervals. +/// +/// This struct provides a similar interface to native WebSocket subscriptions, +/// allowing HTTP providers to participate in the subscription system. +/// +/// # How It Works +/// +/// Uses alloy's [`watch_blocks()`](alloy::providers::Provider::watch_blocks), which: +/// 1. Creates a block filter via `eth_newBlockFilter` +/// 2. Polls `eth_getFilterChanges` at `poll_interval` to get new block hashes +/// 3. Fetches full block headers for each hash +/// +/// # Trade-offs +/// +/// * **Latency**: New blocks are detected with up to `poll_interval` delay +/// * **RPC Load**: One filter poll per interval, plus one `get_block_by_hash` per new block +pub struct HttpPollingSubscription { + /// Receiver for block hashes from the poller + receiver: mpsc::Receiver, + /// Provider used to fetch block headers from hashes + provider: RobustProvider, +} + +impl HttpPollingSubscription +where + N::HeaderResponse: Clone + Send, +{ + /// Create a new HTTP polling subscription. + /// + /// Sets up a block filter and returns a subscription that polls for new blocks. + /// + /// # Arguments + /// + /// * `provider` - The HTTP provider to poll + /// * `config` - Configuration for polling behavior + /// + /// # Example + /// + /// ```rust,no_run + /// use robust_provider::{RobustProvider, RobustProviderBuilder}; + /// use robust_provider::robust_provider::http_subscription::{HttpPollingSubscription, HttpSubscriptionConfig}; + /// use alloy::providers::ProviderBuilder; + /// use std::time::Duration; + /// + /// # async fn example() -> anyhow::Result<()> { + /// let http = ProviderBuilder::new().connect_http("http://localhost:8545".parse()?)?; + /// let provider = RobustProviderBuilder::new(http).build().await?; + /// let config = HttpSubscriptionConfig { + /// poll_interval: Duration::from_secs(6), + /// ..Default::default() + /// }; + /// let mut sub = HttpPollingSubscription::new(provider, config).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn new( + provider: RobustProvider, + config: HttpSubscriptionConfig, + ) -> Result { + let (sender, receiver) = mpsc::channel(config.buffer_capacity); + + let poller = provider + .primary() + .watch_blocks() + .await + .map_err(HttpSubscriptionError::from)? + .with_poll_interval(config.poll_interval) + .with_channel_size(config.buffer_capacity); + + // Spawn a task to forward block hashes to the channel + let stream = poller.into_stream().flat_map(stream::iter); + tokio::spawn(async move { + let mut stream = stream; + let sender = sender; + while let Some(hash) = stream.next().await { + if sender.send(hash).await.is_err() { + // Receiver dropped, stop polling + break; + } + } + }); + + Ok(Self { receiver, provider }) + } + + /// Receive the next block header. + /// + /// This will block until a new block is available or an error occurs. + /// + /// # Errors + /// + /// * [`HttpSubscriptionError::Closed`] - if the subscription channel is closed. + /// * [`HttpSubscriptionError::Timeout`] - if the polling operation times out. + /// * [`HttpSubscriptionError::RpcError`] - if an RPC error occurs during polling. + /// * [`HttpSubscriptionError::BlockFetchFailed`] - if the block fetch fails. + pub async fn recv(&mut self) -> Result { + let block_hash = self.receiver.recv().await.ok_or(HttpSubscriptionError::Closed)?; + + let block = self.provider.get_block_by_hash(block_hash).await.map_err(|e| match e { + crate::Error::Timeout => HttpSubscriptionError::Timeout, + crate::Error::BlockNotFound => { + HttpSubscriptionError::BlockFetchFailed("Block not found".to_string()) + } + crate::Error::RpcError(rpc_err) => HttpSubscriptionError::RpcError(rpc_err), + })?; + Ok(block.header().clone()) + } + + /// Check if the subscription channel is empty (no pending messages). + #[must_use] + pub fn is_empty(&self) -> bool { + self.receiver.is_empty() + } +} + +impl std::fmt::Debug for HttpPollingSubscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HttpPollingSubscription") + .field("stream", &"") + .field("provider", &"") + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::RobustProviderBuilder; + use alloy::{ + consensus::BlockHeader, + node_bindings::Anvil, + providers::{ProviderBuilder, ext::AnvilApi}, + }; + use std::time::Duration; + + #[tokio::test] + async fn test_http_polling_config_defaults() { + let config = HttpSubscriptionConfig::default(); + assert_eq!(config.poll_interval, DEFAULT_POLL_INTERVAL); + assert_eq!(config.call_timeout, DEFAULT_CALL_TIMEOUT); + assert_eq!(config.buffer_capacity, DEFAULT_BUFFER_CAPACITY); + } + + #[tokio::test] + async fn test_http_polling_receives_new_block() -> anyhow::Result<()> { + let anvil = Anvil::new().try_spawn()?; + let root_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + let provider = RobustProviderBuilder::new(root_provider.clone()).build().await?; + + let config = HttpSubscriptionConfig { + poll_interval: Duration::from_millis(50), + call_timeout: Duration::from_secs(5), + buffer_capacity: 16, + }; + + let mut sub = HttpPollingSubscription::new(provider, config).await?; + + // Mine a block + root_provider.anvil_mine(Some(1), None).await?; + + // Should receive the newly mined block + let result = tokio::time::timeout(Duration::from_secs(2), sub.recv()).await; + assert!(result.is_ok(), "Should receive new block within timeout"); + let block = result.unwrap()?; + assert_eq!(block.number(), 1, "Should receive block 1"); + + Ok(()) + } + + #[tokio::test] + async fn test_http_polling_receives_new_blocks() -> anyhow::Result<()> { + let anvil = Anvil::new().try_spawn()?; + let root_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + let provider = RobustProviderBuilder::new(root_provider.clone()).build().await?; + + let config = HttpSubscriptionConfig { + poll_interval: Duration::from_millis(50), + call_timeout: Duration::from_secs(5), + buffer_capacity: 16, + }; + + let mut sub = HttpPollingSubscription::new(provider, config).await?; + + // Mine a new block + root_provider.anvil_mine(Some(1), None).await?; + + // Should receive block 1 + let block = tokio::time::timeout(Duration::from_secs(2), sub.recv()) + .await + .expect("timeout waiting for block 1") + .expect("recv error on block 1"); + assert_eq!(block.number(), 1); + + // Mine another block + root_provider.anvil_mine(Some(1), None).await?; + + // Should receive block 2 + let block = tokio::time::timeout(Duration::from_secs(2), sub.recv()) + .await + .expect("timeout waiting for block 2") + .expect("recv error on block 2"); + assert_eq!(block.number(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_http_subscription_error_types() { + // Test Timeout error + let timeout_err = HttpSubscriptionError::Timeout; + assert!(matches!(timeout_err, HttpSubscriptionError::Timeout)); + + // Test RpcError conversion + let rpc_err: RpcError = TransportErrorKind::custom_str("test error"); + let sub_err: HttpSubscriptionError = rpc_err.into(); + assert!(matches!(sub_err, HttpSubscriptionError::RpcError(_))); + + // Test Closed error + let closed_err = HttpSubscriptionError::Closed; + assert!(matches!(closed_err, HttpSubscriptionError::Closed)); + + // Test BlockFetchFailed error + let fetch_err = HttpSubscriptionError::BlockFetchFailed("test".to_string()); + assert!(matches!(fetch_err, HttpSubscriptionError::BlockFetchFailed(_))); + } +} diff --git a/src/robust_provider/mod.rs b/src/robust_provider/mod.rs index 47dcbf2..8aa00d3 100644 --- a/src/robust_provider/mod.rs +++ b/src/robust_provider/mod.rs @@ -13,15 +13,26 @@ //! //! * [`IntoRobustProvider`] - Convert types into a `RobustProvider` //! * [`IntoRootProvider`] - Convert types into an underlying root provider +//! +//! # Feature Flags +//! +//! * `http-subscription` - Enable HTTP-based polling subscriptions for providers without +//! native pubsub support mod builder; mod errors; +#[cfg(feature = "http-subscription")] +mod http_subscription; mod provider; mod provider_conversion; mod subscription; pub use builder::*; pub use errors::{CoreError, Error}; +#[cfg(feature = "http-subscription")] +pub use http_subscription::{ + DEFAULT_POLL_INTERVAL, HttpPollingSubscription, HttpSubscriptionConfig, HttpSubscriptionError, +}; pub use provider::RobustProvider; pub use provider_conversion::{IntoRobustProvider, IntoRootProvider}; pub use subscription::{ diff --git a/src/robust_provider/provider.rs b/src/robust_provider/provider.rs index 051474f..d2a858f 100644 --- a/src/robust_provider/provider.rs +++ b/src/robust_provider/provider.rs @@ -35,6 +35,9 @@ use alloy::{ use crate::{Error, block_not_found_doc, robust_provider::RobustSubscription}; +#[cfg(feature = "http-subscription")] +use crate::robust_provider::http_subscription::{HttpPollingSubscription, HttpSubscriptionConfig}; + /// Provider wrapper with built-in retry and timeout mechanisms. /// /// This wrapper around Alloy providers automatically handles retries, @@ -49,6 +52,12 @@ pub struct RobustProvider { pub(crate) min_delay: Duration, pub(crate) reconnect_interval: Duration, pub(crate) subscription_buffer_capacity: usize, + /// Polling interval for HTTP-based subscriptions. + #[cfg(feature = "http-subscription")] + pub(crate) poll_interval: Duration, + /// Whether HTTP providers can participate in subscriptions via polling. + #[cfg(feature = "http-subscription")] + pub(crate) allow_http_subscriptions: bool, } impl RobustProvider { @@ -476,6 +485,10 @@ impl RobustProvider { /// * Detects and recovers from lagged subscriptions /// * Periodically attempts to reconnect to the primary provider /// + /// When the `http-subscription` feature is enabled and + /// [`allow_http_subscriptions`](crate::RobustProviderBuilder::allow_http_subscriptions) + /// is set to `true`, HTTP providers can participate in subscriptions via polling. + /// /// This is a wrapper function for [`Provider::subscribe_blocks`]. /// /// # Errors @@ -485,6 +498,22 @@ impl RobustProvider { /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds /// `call_timeout`). pub async fn subscribe_blocks(&self) -> Result, Error> { + #[cfg(feature = "http-subscription")] + { + let primary_supports_pubsub = self.primary().client().pubsub_frontend().is_some(); + if primary_supports_pubsub { + return self.subscribe_blocks_ws().await; + } else { + return self.subscribe_blocks_http().await; + } + } + + #[cfg(not(feature = "http-subscription"))] + self.subscribe_blocks_ws().await + } + + /// Subscribe to new block headers using WebSocket with failover. + async fn subscribe_blocks_ws(&self) -> Result, Error> { let subscription = self .try_operation_with_failover( move |provider| async move { @@ -493,13 +522,73 @@ impl RobustProvider { .channel_size(self.subscription_buffer_capacity) .await }, - true, + true, // require_pubsub ) .await?; Ok(RobustSubscription::new(subscription, self.clone())) } + /// Subscribe to new block headers using HTTP polling. + /// Falls back to WebSocket if HTTP polling fails. + #[cfg(feature = "http-subscription")] + async fn subscribe_blocks_http(&self) -> Result, Error> { + use crate::robust_provider::http_subscription::HttpSubscriptionError; + + if !self.allow_http_subscriptions { + return self.subscribe_blocks_ws().await; + } + + let config = HttpSubscriptionConfig { + poll_interval: self.poll_interval, + call_timeout: self.call_timeout, + buffer_capacity: self.subscription_buffer_capacity, + }; + + info!( + poll_interval_ms = self.poll_interval.as_millis(), + "Starting HTTP polling subscription on primary provider" + ); + + // Try HTTP polling on primary first + let http_sub_result = HttpPollingSubscription::new(self.clone(), config.clone()).await; + + if let Ok(http_sub) = http_sub_result { + return Ok(RobustSubscription::new_http(http_sub, self.clone(), config)); + } + + // Track the last error for proper error reporting + let last_error: Option = http_sub_result.err(); + + warn!("HTTP subscription on primary failed, trying fallback providers"); + + // Primary HTTP subscription failed, try WebSocket on fallback providers + for (fallback_idx, provider) in self.fallback_providers().iter().enumerate() { + // Try WebSocket subscription if supported + if provider.client().pubsub_frontend().is_some() { + let operation = move |p: RootProvider| async move { + p.subscribe_blocks().channel_size(self.subscription_buffer_capacity).await + }; + + if let Ok(sub) = self.try_provider_with_timeout(provider, &operation).await { + info!( + fallback_index = fallback_idx, + "Subscription switched to fallback provider (WebSocket)" + ); + return Ok(RobustSubscription::new(sub, self.clone())); + } + } + } + + // All providers exhausted - return the actual error instead of generic Timeout + Err(match last_error { + Some(HttpSubscriptionError::RpcError(e)) => Error::RpcError(e), + Some(HttpSubscriptionError::Timeout) => Error::Timeout, + Some(e) => Error::RpcError(std::sync::Arc::new(RpcError::LocalUsageError(Box::new(e)))), + None => Error::Timeout, + }) + } + /// Execute `operation` with exponential backoff and a total timeout. /// /// Wraps the retry logic with [`tokio::time::timeout`] so @@ -676,6 +765,10 @@ mod tests { min_delay: Duration::from_millis(min_delay), reconnect_interval: DEFAULT_RECONNECT_INTERVAL, subscription_buffer_capacity: DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, + #[cfg(feature = "http-subscription")] + poll_interval: crate::DEFAULT_POLL_INTERVAL, + #[cfg(feature = "http-subscription")] + allow_http_subscriptions: false, } } diff --git a/src/robust_provider/subscription.rs b/src/robust_provider/subscription.rs index cb5ae5b..6d54bf4 100644 --- a/src/robust_provider/subscription.rs +++ b/src/robust_provider/subscription.rs @@ -18,6 +18,11 @@ use tokio_util::sync::ReusableBoxFuture; use crate::robust_provider::{CoreError, RobustProvider}; +#[cfg(feature = "http-subscription")] +use crate::robust_provider::http_subscription::{ + HttpPollingSubscription, HttpSubscriptionConfig, HttpSubscriptionError, +}; + /// Errors that can occur when using [`RobustSubscription`]. #[derive(Error, Debug, Clone)] pub enum Error { @@ -55,37 +60,96 @@ impl From for Error { } } +#[cfg(feature = "http-subscription")] +impl From for Error { + fn from(err: HttpSubscriptionError) -> Self { + match err { + HttpSubscriptionError::Timeout => Error::Timeout, + HttpSubscriptionError::RpcError(e) => Error::RpcError(e), + HttpSubscriptionError::Closed => Error::Closed, + HttpSubscriptionError::BlockFetchFailed(msg) => { + // Use custom_str which returns RpcError directly + Error::RpcError(Arc::new(TransportErrorKind::custom_str(&msg))) + } + } + } +} + /// Default time interval between primary provider reconnection attempts pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::from_secs(30); +/// Timeout for validating HTTP provider reachability during reconnection +pub const HTTP_RECONNECT_VALIDATION_TIMEOUT: Duration = Duration::from_millis(150); + +/// Backend for subscriptions - either native WebSocket or HTTP polling. +/// +/// This enum allows `RobustSubscription` to transparently handle both +/// WebSocket-based and HTTP polling-based subscriptions. +#[derive(Debug)] +pub(crate) enum SubscriptionBackend { + /// Native WebSocket subscription using pubsub + WebSocket(Subscription), + /// HTTP polling-based subscription (requires `http-subscription` feature) + #[cfg(feature = "http-subscription")] + HttpPolling(HttpPollingSubscription), +} + /// A robust subscription wrapper that automatically handles provider failover /// and periodic reconnection attempts to the primary provider. #[derive(Debug)] pub struct RobustSubscription { - subscription: Subscription, + backend: SubscriptionBackend, robust_provider: RobustProvider, last_reconnect_attempt: Option, current_fallback_index: Option, + /// Configuration for HTTP polling (stored for failover to HTTP providers) + #[cfg(feature = "http-subscription")] + http_config: HttpSubscriptionConfig, } impl RobustSubscription { - /// Create a new [`RobustSubscription`] + /// Create a new [`RobustSubscription`] with a WebSocket backend. pub(crate) fn new( subscription: Subscription, robust_provider: RobustProvider, + ) -> Self { + #[cfg(feature = "http-subscription")] + let http_config = HttpSubscriptionConfig { + poll_interval: robust_provider.poll_interval, + call_timeout: robust_provider.call_timeout, + buffer_capacity: robust_provider.subscription_buffer_capacity, + }; + + Self { + backend: SubscriptionBackend::WebSocket(subscription), + robust_provider, + last_reconnect_attempt: None, + current_fallback_index: None, + #[cfg(feature = "http-subscription")] + http_config, + } + } + + /// Create a new [`RobustSubscription`] with an HTTP polling backend. + #[cfg(feature = "http-subscription")] + pub(crate) fn new_http( + subscription: HttpPollingSubscription, + robust_provider: RobustProvider, + config: HttpSubscriptionConfig, ) -> Self { Self { - subscription, + backend: SubscriptionBackend::HttpPolling(subscription), robust_provider, last_reconnect_attempt: None, current_fallback_index: None, + http_config: config, } } /// Receive the next item from the subscription with automatic failover. /// /// This method will: - /// * Attempt to receive from the current subscription + /// * Attempt to receive from the current subscription (WebSocket or HTTP polling) /// * Handle errors by switching to fallback providers /// * Periodically attempt to reconnect to the primary provider /// * Will switch to fallback providers if subscription timeout is exhausted @@ -108,21 +172,47 @@ impl RobustSubscription { let subscription_timeout = self.robust_provider.subscription_timeout; loop { - match timeout(subscription_timeout, self.subscription.recv()).await { - Ok(Ok(header)) => { + // Receive from the appropriate backend + let result = match &mut self.backend { + SubscriptionBackend::WebSocket(sub) => { + match timeout(subscription_timeout, sub.recv()).await { + Ok(Ok(header)) => Ok(header), + Ok(Err(recv_error)) => Err(Error::from(recv_error)), + Err(_elapsed) => Err(Error::Timeout), + } + } + #[cfg(feature = "http-subscription")] + SubscriptionBackend::HttpPolling(sub) => { + match timeout(subscription_timeout, sub.recv()).await { + Ok(Ok(header)) => Ok(header), + Ok(Err(e)) => Err(Error::from(e)), + Err(_elapsed) => Err(Error::Timeout), + } + } + }; + + match result { + Ok(header) => { if self.is_on_fallback() { self.try_reconnect_to_primary(false).await; } return Ok(header); } - Ok(Err(recv_error)) => return Err(recv_error.into()), - Err(_elapsed) => { + Err(Error::Timeout) => { warn!( timeout_secs = subscription_timeout.as_secs(), "Subscription timeout - no block received, switching provider" ); self.switch_to_fallback(CoreError::Timeout).await?; } + // Propagate these errors directly without failover + Err(Error::Closed) => return Err(Error::Closed), + Err(Error::Lagged(count)) => return Err(Error::Lagged(count)), + // RPC errors trigger failover + Err(Error::RpcError(_e)) => { + warn!("Subscription RPC error, switching provider"); + self.switch_to_fallback(CoreError::Timeout).await?; + } } } } @@ -131,8 +221,8 @@ impl RobustSubscription { /// Returns true if reconnection was successful, false if it's not time yet or if it failed. async fn try_reconnect_to_primary(&mut self, force: bool) -> bool { // Check if we should attempt reconnection - let should_reconnect = force || - match self.last_reconnect_attempt { + let should_reconnect = force + || match self.last_reconnect_attempt { None => false, Some(last_attempt) => { last_attempt.elapsed() >= self.robust_provider.reconnect_interval @@ -143,23 +233,50 @@ impl RobustSubscription { return false; } - let operation = - move |provider: RootProvider| async move { provider.subscribe_blocks().await }; - let primary = self.robust_provider.primary(); - let subscription = - self.robust_provider.try_provider_with_timeout(primary, &operation).await; - - if let Ok(sub) = subscription { - info!("Reconnected to primary provider"); - self.subscription = sub; - self.current_fallback_index = None; - self.last_reconnect_attempt = None; - true - } else { - self.last_reconnect_attempt = Some(Instant::now()); - false + + // Try WebSocket subscription first if supported + if Self::supports_pubsub(primary) { + let operation = + move |provider: RootProvider| async move { provider.subscribe_blocks().await }; + + let subscription = + self.robust_provider.try_provider_with_timeout(primary, &operation).await; + + if let Ok(sub) = subscription { + info!("Reconnected to primary provider (WebSocket)"); + self.backend = SubscriptionBackend::WebSocket(sub); + self.current_fallback_index = None; + self.last_reconnect_attempt = None; + return true; + } + } + + // Try HTTP polling if enabled and WebSocket not available/failed + #[cfg(feature = "http-subscription")] + if self.robust_provider.allow_http_subscriptions { + let validation = + tokio::time::timeout(HTTP_RECONNECT_VALIDATION_TIMEOUT, primary.get_block_number()) + .await; + + if matches!(validation, Ok(Ok(_))) { + if let Ok(http_sub) = HttpPollingSubscription::new( + self.robust_provider.clone(), + self.http_config.clone(), + ) + .await + { + info!("Reconnected to primary provider (HTTP polling)"); + self.backend = SubscriptionBackend::HttpPolling(http_sub); + self.current_fallback_index = None; + self.last_reconnect_attempt = None; + return true; + } + } } + + self.last_reconnect_attempt = Some(Instant::now()); + false } async fn switch_to_fallback(&mut self, last_error: CoreError) -> Result<(), Error> { @@ -172,21 +289,55 @@ impl RobustSubscription { self.last_reconnect_attempt = Some(Instant::now()); } - let operation = - move |provider: RootProvider| async move { provider.subscribe_blocks().await }; - // Start searching from the next provider after the current one let start_index = self.current_fallback_index.map_or(0, |idx| idx + 1); + let fallback_providers = self.robust_provider.fallback_providers(); + + // Try each fallback provider + for (idx, provider) in fallback_providers.iter().enumerate().skip(start_index) { + // Try WebSocket subscription first if provider supports pubsub + if Self::supports_pubsub(provider) { + let operation = move |p: RootProvider| async move { p.subscribe_blocks().await }; + + if let Ok(sub) = + self.robust_provider.try_provider_with_timeout(provider, &operation).await + { + info!( + fallback_index = idx, + "Subscription switched to fallback provider (WebSocket)" + ); + self.backend = SubscriptionBackend::WebSocket(sub); + self.current_fallback_index = Some(idx); + return Ok(()); + } + } - let (sub, fallback_idx) = self - .robust_provider - .try_fallback_providers_from(&operation, true, last_error, start_index) - .await?; + // Try HTTP polling if enabled + #[cfg(feature = "http-subscription")] + if self.robust_provider.allow_http_subscriptions { + if let Ok(http_sub) = HttpPollingSubscription::new( + self.robust_provider.clone(), + self.http_config.clone(), + ) + .await + { + info!( + fallback_index = idx, + "Subscription switched to fallback provider (HTTP polling)" + ); + self.backend = SubscriptionBackend::HttpPolling(http_sub); + self.current_fallback_index = Some(idx); + return Ok(()); + } + } + } - info!(fallback_index = fallback_idx, "Subscription switched to fallback provider"); - self.subscription = sub; - self.current_fallback_index = Some(fallback_idx); - Ok(()) + // All fallbacks exhausted + error!( + attempted_providers = fallback_providers.len() + 1, + "All providers exhausted for subscription" + ); + Err(last_error.into()) } /// Returns true if currently using a fallback provider @@ -194,10 +345,19 @@ impl RobustSubscription { self.current_fallback_index.is_some() } + /// Check if a provider supports native pubsub (WebSocket) + fn supports_pubsub(provider: &RootProvider) -> bool { + provider.client().pubsub_frontend().is_some() + } + /// Check if the subscription channel is empty (no pending messages) #[must_use] - pub fn is_empty(&self) -> bool { - self.subscription.is_empty() + pub fn is_empty(&mut self) -> bool { + match &mut self.backend { + SubscriptionBackend::WebSocket(sub) => sub.is_empty(), + #[cfg(feature = "http-subscription")] + SubscriptionBackend::HttpPolling(sub) => sub.is_empty(), + } } /// Convert the subscription into a stream. diff --git a/tests/http_subscription.rs b/tests/http_subscription.rs new file mode 100644 index 0000000..459a638 --- /dev/null +++ b/tests/http_subscription.rs @@ -0,0 +1,745 @@ +//! Integration tests for HTTP subscription functionality. +//! +//! These tests verify that HTTP providers can participate in subscriptions +//! via polling when the `http-subscription` feature is enabled. + +#![cfg(feature = "http-subscription")] + +mod common; + +use std::time::Duration; + +use alloy::{ + network::Ethereum, + node_bindings::Anvil, + providers::{Provider, ProviderBuilder, RootProvider, ext::AnvilApi}, +}; +use common::{BUFFER_TIME, SHORT_TIMEOUT}; +use robust_provider::{RobustProviderBuilder, SubscriptionError}; +use tokio_stream::StreamExt; + +// ============================================================================ +// Test Helpers +// ============================================================================ + +/// Short poll interval for tests +const TEST_POLL_INTERVAL: Duration = Duration::from_millis(50); + +async fn spawn_http_anvil() +-> anyhow::Result<(alloy::node_bindings::AnvilInstance, RootProvider)> { + let anvil = Anvil::new().try_spawn()?; + let provider = RootProvider::new_http(anvil.endpoint_url()); + Ok((anvil, provider)) +} + +async fn spawn_ws_anvil() +-> anyhow::Result<(alloy::node_bindings::AnvilInstance, RootProvider)> { + let anvil = Anvil::new().try_spawn()?; + let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; + Ok((anvil, provider.root().clone())) +} + +// ============================================================================ +// Basic HTTP Subscription Tests +// ============================================================================ + +/// Test: HTTP polling subscription receives blocks correctly +#[tokio::test] +async fn test_http_subscription_basic_flow() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine a block + provider.anvil_mine(Some(1), None).await?; + + // Should receive block 1 + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout waiting for block 1") + .expect("recv error"); + assert_eq!(block.number, 1, "Should receive block 1"); + + // Mine another block + provider.anvil_mine(Some(1), None).await?; + + // Should receive block 2 + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout waiting for block 2") + .expect("recv error"); + assert_eq!(block.number, 2, "Should receive block 2"); + + Ok(()) +} + +/// Test: HTTP subscription correctly receives multiple consecutive blocks +#[tokio::test] +async fn test_http_subscription_multiple_blocks() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine and receive 5 blocks sequentially + for expected_block in 1..=5 { + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, expected_block, "Block number mismatch"); + } + + Ok(()) +} + +/// Test: HTTP subscription works correctly when converted to a Stream +#[tokio::test] +async fn test_http_subscription_as_stream() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let subscription = robust.subscribe_blocks().await?; + let mut stream = subscription.into_stream(); + + // Mine and receive via stream + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), stream.next()) + .await + .expect("timeout") + .expect("stream ended unexpectedly") + .expect("recv error"); + assert_eq!(block.number, 1); + + // Mine another and receive via stream + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), stream.next()) + .await + .expect("timeout") + .expect("stream ended unexpectedly") + .expect("recv error"); + assert_eq!(block.number, 2); + + Ok(()) +} + +// ============================================================================ +// Failover Tests +// ============================================================================ + +/// Test: When WS primary dies, subscription fails over to HTTP fallback +/// +/// Verification: We confirm failover by checking that after WS death, +/// we still receive blocks (which must come from HTTP since WS is dead) +#[tokio::test] +async fn test_failover_ws_to_http_on_provider_death() -> anyhow::Result<()> { + let (anvil_ws, ws_provider) = spawn_ws_anvil().await?; + let (_anvil_http, http_provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(ws_provider.clone()) + .fallback(http_provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Receive initial block from WS + ws_provider.anvil_mine(Some(1), None).await?; + let block = subscription.recv().await?; + assert_eq!(block.number, 1, "Should receive from WS primary"); + + // Kill WS provider - this will cause subscription to fail + drop(anvil_ws); + + // Spawn task to mine on HTTP after timeout triggers failover + let http_clone = http_provider.clone(); + tokio::spawn(async move { + tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; + http_clone.anvil_mine(Some(1), None).await.unwrap(); + }); + + // Should eventually receive a block - since WS is dead, this MUST be from HTTP + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout - failover may have failed") + .expect("recv error"); + + // We received a block after WS died, proving failover worked + // (HTTP starts at genesis, so we get block 0 or 1 depending on timing) + assert!(block.number <= 1, "Should receive low block number from HTTP fallback"); + + Ok(()) +} + +/// Test: When HTTP primary becomes unavailable, subscription fails over to WS fallback +#[tokio::test] +async fn test_failover_http_to_ws_on_provider_death() -> anyhow::Result<()> { + let (anvil_http, http_provider) = spawn_http_anvil().await?; + let (_anvil_ws, ws_provider) = spawn_ws_anvil().await?; + + let robust = RobustProviderBuilder::fragile(http_provider.clone()) + .fallback(ws_provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine and receive from HTTP + http_provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 1, "Should start on HTTP primary"); + + // Kill HTTP provider + drop(anvil_http); + + // Mine on WS shortly after HTTP error is detected. + // The HTTP poll will fail quickly (connection refused), triggering immediate failover to WS. + // We mine after a small delay to ensure WS subscription is established. + let ws_clone = ws_provider.clone(); + tokio::spawn(async move { + tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; + ws_clone.anvil_mine(Some(1), None).await.unwrap(); + }); + + // Should receive from WS fallback (WS also starts at genesis, so block 1 after mining) + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout - failover may have failed") + .expect("recv error"); + + assert_eq!(block.number, 1, "Should receive block from WS fallback"); + + Ok(()) +} + +// ============================================================================ +// Configuration Tests +// ============================================================================ + +/// Test: All-HTTP provider chain works (no WS providers at all) +#[tokio::test] +async fn test_http_only_provider_chain() -> anyhow::Result<()> { + let (_anvil1, http1) = spawn_http_anvil().await?; + let (_anvil2, http2) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(http1.clone()) + .fallback(http2.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine and receive + http1.anvil_mine(Some(1), None).await?; + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + http1.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 2); + + Ok(()) +} + +/// Test: When allow_http_subscriptions is false (default), HTTP providers are skipped +/// and subscription uses WS fallback +#[tokio::test] +async fn test_http_subscriptions_disabled_skips_http() -> anyhow::Result<()> { + let (_anvil_http, http_provider) = spawn_http_anvil().await?; + let (_anvil_ws, ws_provider) = spawn_ws_anvil().await?; + + // HTTP primary but http subscriptions NOT enabled (default) + let robust = RobustProviderBuilder::new(http_provider.clone()) + .fallback(ws_provider.clone()) + // allow_http_subscriptions defaults to false + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + // subscribe_blocks should skip HTTP and use WS + let mut subscription = robust.subscribe_blocks().await?; + + // Mine on both - if HTTP was used, we'd get block 0 first + // Since HTTP is skipped, we should only see WS blocks + ws_provider.anvil_mine(Some(1), None).await?; + http_provider.anvil_mine(Some(5), None).await?; // Mine more on HTTP + + let block = subscription.recv().await?; + // WS block 1, not HTTP block 0 or 5 + assert_eq!(block.number, 1, "Should use WS fallback, not HTTP primary"); + + Ok(()) +} + +/// Test: When allow_http_subscriptions is false and no WS providers exist, +/// subscribe_blocks should fail +#[tokio::test] +async fn test_http_disabled_no_ws_fails() -> anyhow::Result<()> { + let (_anvil, http_provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(http_provider.clone()) + // No fallbacks, HTTP subscriptions disabled + .subscription_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Should fail because no pubsub-capable provider exists + let result = robust.subscribe_blocks().await; + assert!(result.is_err(), "Should fail when no WS providers and HTTP disabled"); + + Ok(()) +} + +/// Test: poll_interval configuration is respected +#[tokio::test] +async fn test_poll_interval_is_respected() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let poll_interval = Duration::from_millis(200); + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(poll_interval) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine first block and receive it + provider.anvil_mine(Some(1), None).await?; + let _ = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + + // Mine another block + provider.anvil_mine(Some(1), None).await?; + + // Measure how long it takes to receive the next block + let start = std::time::Instant::now(); + let _ = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + let elapsed = start.elapsed(); + + // Should take at least half the poll interval + // (being lenient because block might arrive mid-interval) + let min_expected = poll_interval / 2; + assert!( + elapsed >= min_expected, + "Poll interval not respected. Expected >= {:?}, got {:?}", + min_expected, + elapsed + ); + + Ok(()) +} + +// ============================================================================ +// Error Handling Tests +// ============================================================================ + +/// Test: HTTP subscription handles provider errors gracefully +#[tokio::test] +async fn test_http_subscription_survives_temporary_errors() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine blocks - subscription should work + for i in 1..=3 { + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, i); + } + + Ok(()) +} + +/// Test: When all providers fail, subscription returns an error +#[tokio::test] +async fn test_all_providers_fail_returns_error() -> anyhow::Result<()> { + let (anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine and receive a block first + provider.anvil_mine(Some(1), None).await?; + let _ = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + + // Kill the only provider + drop(anvil); + + // Next recv should eventually error (after timeout) + let result = tokio::time::timeout(Duration::from_secs(5), subscription.recv()).await; + + match result { + Ok(Ok(_)) => panic!("Should not receive block from dead provider"), + Ok(Err(e)) => { + // Expected - got an error + assert!( + matches!(e, SubscriptionError::Timeout | SubscriptionError::RpcError(_)), + "Expected Timeout or RpcError, got {:?}", + e + ); + } + Err(_) => { + // Timeout is also acceptable + } + } + + Ok(()) +} + +// ============================================================================ +// Deduplication Tests +// ============================================================================ + +/// Test: HTTP polling correctly deduplicates blocks (same block not emitted twice) +#[tokio::test] +async fn test_http_polling_deduplication() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(Duration::from_millis(20)) // Very fast polling + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine first block + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 1); + + // Wait for multiple poll cycles without mining + tokio::time::sleep(Duration::from_millis(100)).await; + + // Now mine ONE more block + provider.anvil_mine(Some(1), None).await?; + + // Should receive exactly block 2 (not duplicate of block 1) + let block = tokio::time::timeout(Duration::from_secs(1), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 2, "Should receive block 2, not duplicate of 1"); + + Ok(()) +} + +// ============================================================================ +// Configuration Propagation Tests +// ============================================================================ + +/// Test: poll_interval from builder is used when subscription fails over to HTTP +/// +/// This verifies fix for bug where http_config used defaults instead of +/// user-configured values when a WebSocket subscription was created first. +#[tokio::test] +async fn test_poll_interval_propagated_from_builder() -> anyhow::Result<()> { + let (_anvil_ws, ws_provider) = spawn_ws_anvil().await?; + let (_anvil_http, http_provider) = spawn_http_anvil().await?; + + // Use a distinctive poll interval that's different from the default (12s) + let custom_poll_interval = Duration::from_millis(30); + + let robust = RobustProviderBuilder::fragile(ws_provider.clone()) + .fallback(http_provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(custom_poll_interval) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + // Start subscription on WebSocket + let mut subscription = robust.subscribe_blocks().await?; + + ws_provider.anvil_mine(Some(1), None).await?; + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill WS to force failover to HTTP + drop(_anvil_ws); + + // Mine on HTTP and wait for failover + let http_clone = http_provider.clone(); + tokio::spawn(async move { + tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; + http_clone.anvil_mine(Some(1), None).await.unwrap(); + }); + + // Should receive block from HTTP fallback + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout waiting for HTTP fallback block") + .expect("recv error"); + + // Verify we got a block (proving failover worked with correct config) + assert!(block.number <= 1); + + // Now verify the poll interval is being used by timing block reception + // Mine another block and measure how long until we receive it + http_provider.anvil_mine(Some(1), None).await?; + + let start = std::time::Instant::now(); + let _ = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + let elapsed = start.elapsed(); + + // Should take roughly poll_interval to detect the new block + // Allow some margin but it should be much less than the default 12s + assert!( + elapsed < Duration::from_millis(500), + "Poll interval not respected. Elapsed {:?}, expected ~{:?}", + elapsed, + custom_poll_interval + ); + + Ok(()) +} + +// ============================================================================ +// HTTP Reconnection Validation Tests +// ============================================================================ + +/// Test: HTTP reconnection validates provider is reachable before claiming success +/// +/// This verifies fix for bug where HTTP reconnection didn't validate the provider, +/// potentially "reconnecting" to a dead provider. +#[tokio::test] +async fn test_http_reconnect_validates_provider() -> anyhow::Result<()> { + // Start with HTTP primary (will be killed) and HTTP fallback + let (anvil_primary, primary) = spawn_http_anvil().await?; + let (_anvil_fallback, fallback) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(primary.clone()) + .fallback(fallback.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .reconnect_interval(Duration::from_millis(100)) // Fast reconnect for test + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine a block on primary after subscription + primary.anvil_mine(Some(1), None).await?; + + // Get initial block from primary + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill primary - subscription should failover to fallback + drop(anvil_primary); + + // Trigger failover by waiting for timeout, then mine on fallback + let fb_clone = fallback.clone(); + tokio::spawn(async move { + tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; + fb_clone.anvil_mine(Some(1), None).await.unwrap(); + }); + + // Should receive from fallback (block 1 on fallback) + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + let fallback_block = block.number; + assert_eq!(fallback_block, 1, "Should receive block 1 from fallback"); + + // Wait for reconnect interval to elapse + tokio::time::sleep(Duration::from_millis(150)).await; + + // Mine another block on fallback - this triggers reconnect attempt + // Since primary is dead, reconnect should FAIL validation and stay on fallback + fallback.anvil_mine(Some(1), None).await?; + + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + + // Should still be on fallback (next block), NOT have "reconnected" to dead primary + assert!( + block.number > fallback_block, + "Should still be on fallback after failed reconnect, got block {}", + block.number + ); + + Ok(()) +} + +/// Test: Timeout-triggered failover cycles through multiple fallbacks correctly +/// +/// When a fallback times out (no blocks received), the subscription should: +/// 1. Try to reconnect to primary (fails if dead) +/// 2. Move to the next fallback +/// 3. Eventually receive blocks from a working fallback +#[tokio::test] +async fn test_timeout_triggered_failover_with_multiple_fallbacks() -> anyhow::Result<()> { + let (anvil_primary, primary) = spawn_http_anvil().await?; + let (_anvil_fb1, fallback1) = spawn_http_anvil().await?; + let (_anvil_fb2, fallback2) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(primary.clone()) + .fallback(fallback1.clone()) + .fallback(fallback2.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine a block on primary after subscription + primary.anvil_mine(Some(1), None).await?; + + // Get initial block from primary + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill primary AND fallback1 - only fallback2 will work + drop(anvil_primary); + drop(_anvil_fb1); + + // Don't mine on fallback2 immediately - let timeouts trigger failover + // After SHORT_TIMEOUT, primary poll fails -> try fallback1 + // After SHORT_TIMEOUT, fallback1 poll fails -> try fallback2 + // Then mine on fallback2 + let fb2_clone = fallback2.clone(); + tokio::spawn(async move { + // Wait for a timeout cycle plus buffer + tokio::time::sleep(SHORT_TIMEOUT + Duration::from_millis(50)).await; + fb2_clone.anvil_mine(Some(1), None).await.unwrap(); + }); + + // Should eventually receive from fallback2 + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout - failover chain may have failed") + .expect("recv error"); + + // Block should be from fallback2 (block number >= 1) + assert!(block.number >= 1, "Should receive block from fallback2, got {}", block.number); + + Ok(()) +} + +/// Test: Single fallback timeout behavior +/// +/// When there's only one fallback and it times out, after exhausting reconnect +/// attempts, the subscription should return an error (no more providers to try). +#[tokio::test] +async fn test_single_fallback_timeout_exhausts_providers() -> anyhow::Result<()> { + let (anvil_primary, primary) = spawn_http_anvil().await?; + let (_anvil_fb, fallback) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(primary.clone()) + .fallback(fallback.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + primary.anvil_mine(Some(1), None).await?; + + // Get initial block from primary + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill both providers + drop(anvil_primary); + drop(_anvil_fb); + + // Don't mine anything - let it timeout and exhaust providers + let result = tokio::time::timeout(Duration::from_secs(3), subscription.recv()).await; + + match result { + Ok(Err(SubscriptionError::Timeout)) => { + // Expected: all providers exhausted, returns timeout error + } + Ok(Err(SubscriptionError::RpcError(_))) => { + // Also acceptable: RPC error from dead providers + } + Ok(Ok(block)) => { + panic!("Should not receive block, got block {}", block.number); + } + Err(_) => { + // Outer timeout - also acceptable, means it's still trying + } + Ok(Err(e)) => { + panic!("Unexpected error type: {:?}", e); + } + } + + Ok(()) +} diff --git a/tests/rpc_failover.rs b/tests/rpc_failover.rs new file mode 100644 index 0000000..dfceb28 --- /dev/null +++ b/tests/rpc_failover.rs @@ -0,0 +1,273 @@ +//! Tests for RPC call retry and failover functionality. + +mod common; + +use std::time::{Duration, Instant}; + +use alloy::{ + eips::BlockNumberOrTag, + node_bindings::Anvil, + providers::{Provider, ProviderBuilder, ext::AnvilApi}, +}; +use robust_provider::{Error, RobustProviderBuilder}; + +// ============================================================================ +// RPC Failover Tests +// ============================================================================ + +#[tokio::test] +async fn test_rpc_failover_when_primary_dead() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + // Mine different number of blocks on each to distinguish them + primary.anvil_mine(Some(10), None).await?; + fallback.anvil_mine(Some(20), None).await?; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Verify primary is used initially + let block_num = robust.get_block_number().await?; + assert_eq!(block_num, 10); + + // Kill primary + drop(anvil_primary); + + // Should failover to fallback + let block_num = robust.get_block_number().await?; + assert_eq!(block_num, 20); + + Ok(()) +} + +#[tokio::test] +async fn test_rpc_cycles_through_multiple_fallbacks() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fb1 = Anvil::new().try_spawn()?; + let anvil_fb2 = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fb1 = ProviderBuilder::new().connect_http(anvil_fb1.endpoint_url()); + let fb2 = ProviderBuilder::new().connect_http(anvil_fb2.endpoint_url()); + + // Mine different blocks to identify each provider + primary.anvil_mine(Some(10), None).await?; + fb1.anvil_mine(Some(20), None).await?; + fb2.anvil_mine(Some(30), None).await?; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fb1) + .fallback(fb2) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Kill primary and first fallback + drop(anvil_primary); + drop(anvil_fb1); + + // Should cycle through to fb2 + let block_num = robust.get_block_number().await?; + assert_eq!(block_num, 30); + + Ok(()) +} + +#[tokio::test] +async fn test_rpc_all_providers_fail() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(1)) + .build() + .await?; + + // Kill all providers + drop(anvil_primary); + drop(anvil_fallback); + + // Should fail after trying all providers + let result = robust.get_block_number().await; + assert!(result.is_err()); + + Ok(()) +} + +// ============================================================================ +// Non-Retryable Error Tests +// ============================================================================ + +#[tokio::test] +async fn test_block_not_found_does_not_retry() -> anyhow::Result<()> { + let anvil = Anvil::new().try_spawn()?; + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let robust = RobustProviderBuilder::new(provider) + .call_timeout(Duration::from_secs(5)) + .max_retries(3) + .min_delay(Duration::from_millis(100)) + .build() + .await?; + + let start = Instant::now(); + + // Request future block - should be BlockNotFound, not retried + let result = robust.get_block(alloy::eips::BlockId::number(999_999)).await; + + let elapsed = start.elapsed(); + + assert!(matches!(result, Err(Error::BlockNotFound))); + // With retries, this would take 300ms+ due to backoff + assert!(elapsed < Duration::from_millis(200)); + + Ok(()) +} + +// ============================================================================ +// Timeout Tests +// ============================================================================ + +#[tokio::test] +async fn test_operation_completes_when_provider_unavailable() -> anyhow::Result<()> { + // Create and immediately kill provider so endpoint doesn't exist + let anvil = Anvil::new().try_spawn()?; + let endpoint = anvil.endpoint_url(); + drop(anvil); + + let provider = ProviderBuilder::new().connect_http(endpoint); + + let robust = RobustProviderBuilder::fragile(provider) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + let start = Instant::now(); + let result = robust.get_block_number().await; + let elapsed = start.elapsed(); + + // Should fail (connection refused) and not hang + assert!(result.is_err()); + assert!(elapsed < Duration::from_secs(5)); + + Ok(()) +} + +// ============================================================================ +// Failover with Different Operations +// ============================================================================ + +#[tokio::test] +async fn test_get_accounts_failover() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Kill primary + drop(anvil_primary); + + let accounts = robust.get_accounts().await?; + assert!(!accounts.is_empty()); + + Ok(()) +} + +#[tokio::test] +async fn test_get_balance_failover() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + let accounts = fallback.get_accounts().await?; + let address = accounts[0]; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Kill primary + drop(anvil_primary); + + let balance = robust.get_balance(address).await?; + assert!(balance > alloy::primitives::U256::ZERO); + + Ok(()) +} + +#[tokio::test] +async fn test_get_block_failover() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + fallback.anvil_mine(Some(5), None).await?; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Kill primary + drop(anvil_primary); + + let block = robust.get_block_by_number(BlockNumberOrTag::Number(3)).await?; + assert_eq!(block.header.number, 3); + + Ok(()) +} + +// ============================================================================ +// Primary Provider Preference +// ============================================================================ + +#[tokio::test] +async fn test_primary_provider_tried_first() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + primary.anvil_mine(Some(100), None).await?; + fallback.anvil_mine(Some(200), None).await?; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Multiple calls should all use primary (it's healthy) + for _ in 0..5 { + let block_num = robust.get_block_number().await?; + assert_eq!(block_num, 100); + } + + Ok(()) +} diff --git a/tests/subscription.rs b/tests/subscription.rs index 64969e7..4e0eb6e 100644 --- a/tests/subscription.rs +++ b/tests/subscription.rs @@ -78,7 +78,7 @@ async fn test_successful_subscription_on_primary() -> anyhow::Result<()> { .build() .await?; - let subscription = robust.subscribe_blocks().await?; + let mut subscription = robust.subscribe_blocks().await?; // Subscription is created successfully - is_empty() returns true initially (no pending // messages) assert!(subscription.is_empty());