diff --git a/src/client.rs b/src/client.rs index 7e4e626..939b9ed 100644 --- a/src/client.rs +++ b/src/client.rs @@ -38,63 +38,97 @@ pub struct Client { macro_rules! impl_inner_call { ( $self:expr, $name:ident $(, $args:expr)* ) => { { - let mut errors = vec![]; - loop { - let read_client = $self.client_type.read().unwrap(); - let res = match &*read_client { + impl_inner_call_impl($self, || { + match &*$self.client_type.read().unwrap() { ClientType::TCP(inner) => inner.$name( $($args, )* ), ClientType::SSL(inner) => inner.$name( $($args, )* ), ClientType::Socks5(inner) => inner.$name( $($args, )* ), - }; - drop(read_client); - match res { - Ok(val) => return Ok(val), - Err(Error::Protocol(_) | Error::AlreadySubscribed(_)) => { - return res; - }, + } + }) + }} +} + +fn impl_inner_call_impl( + self_: &Client, + mut f: impl FnMut() -> Result, +) -> Result { + let mut errors = vec![]; + loop { + match f() { + Ok(val) => return Ok(val), + res @ Err(Error::Protocol(_) | Error::AlreadySubscribed(_)) => { + return res; + } + Err(e) => impl_inner_call_impl_err(self_, &mut errors, e)?, + } + } +} + +fn impl_inner_call_impl_err( + self_: &Client, + errors: &mut Vec, + e: Error, +) -> Result<(), Error> { + let failed_attempts = errors.len() + 1; + + warn!( + "call '{}' failed with {}, retry: {}/{}", + stringify!($name), + e, + failed_attempts, + self_.config.retry() + ); + + errors.push(e); + + if retries_exhausted(failed_attempts, self_.config.retry()) { + warn!( + "call '{}' failed after {} attempts", + stringify!($name), + failed_attempts + ); + return Err(Error::AllAttemptsErrored(std::mem::take(errors))); + } + + // Only one thread will try to recreate the client getting the write lock, + // other eventual threads will get Err and will block at the beginning of + // previous loop when trying to read() + if let Ok(mut write_client) = self_.client_type.try_write() { + loop { + std::thread::sleep(std::time::Duration::from_secs( + (1 << errors.len()).min(30) as u64 + )); + match ClientType::from_config(&self_.url, &self_.config) { + Ok(new_client) => { + info!("Succesfully created new client"); + *write_client = new_client; + break; + } Err(e) => { let failed_attempts = errors.len() + 1; - if retries_exhausted(failed_attempts, $self.config.retry()) { - warn!("call '{}' failed after {} attempts", stringify!($name), failed_attempts); - return Err(Error::AllAttemptsErrored(errors)); - } - - warn!("call '{}' failed with {}, retry: {}/{}", stringify!($name), e, failed_attempts, $self.config.retry()); + warn!( + "re-creating client failed with {}, retry: {}/{}", + e, + failed_attempts, + self_.config.retry() + ); errors.push(e); - // Only one thread will try to recreate the client getting the write lock, - // other eventual threads will get Err and will block at the beginning of - // previous loop when trying to read() - if let Ok(mut write_client) = $self.client_type.try_write() { - loop { - std::thread::sleep(std::time::Duration::from_secs((1 << errors.len()).min(30) as u64)); - match ClientType::from_config(&$self.url, &$self.config) { - Ok(new_client) => { - info!("Succesfully created new client"); - *write_client = new_client; - break; - }, - Err(e) => { - let failed_attempts = errors.len() + 1; - - if retries_exhausted(failed_attempts, $self.config.retry()) { - warn!("re-creating client failed after {} attempts", failed_attempts); - return Err(Error::AllAttemptsErrored(errors)); - } - - warn!("re-creating client failed with {}, retry: {}/{}", e, failed_attempts, $self.config.retry()); - - errors.push(e); - } - } - } + if retries_exhausted(failed_attempts, self_.config.retry()) { + warn!( + "re-creating client failed after {} attempts", + failed_attempts + ); + return Err(Error::AllAttemptsErrored(std::mem::take(errors))); } - }, + } } - }} + } } + + Ok(()) } fn retries_exhausted(failed_attempts: usize, configured_retries: u8) -> bool { @@ -178,7 +212,7 @@ impl ElectrumApi for Client { // `RawClient::internal_raw_call_with_vec` method. let vec = params.into_iter().collect::>(); - impl_inner_call!(self, internal_raw_call_with_vec, method_name, vec.clone()); + impl_inner_call!(self, internal_raw_call_with_vec, method_name, vec.clone()) } #[inline] @@ -425,26 +459,18 @@ mod tests { // try. use std::net::TcpListener; - use std::sync::mpsc::channel; use std::time::{Duration, Instant}; let endpoint = std::env::var("TEST_ELECTRUM_TIMEOUT_PORT").unwrap_or("localhost:60000".into()); - let (sender, receiver) = channel(); + let listener = TcpListener::bind("127.0.0.1:60000").unwrap(); std::thread::spawn(move || { - let listener = TcpListener::bind("127.0.0.1:60000").unwrap(); - sender.send(()).unwrap(); - for _stream in listener.incoming() { std::thread::sleep(Duration::from_secs(60)) } }); - receiver - .recv_timeout(Duration::from_secs(5)) - .expect("Can't start local listener"); - let now = Instant::now(); let client = Client::from_config( &endpoint, @@ -457,4 +483,42 @@ mod tests { assert!(client.is_ok()); assert!(elapsed > Duration::from_secs(2)); } + + #[test] + fn impl_inner_call_all_attempts_has_all_errors() { + use std::net::TcpListener; + use std::time::Duration; + + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + std::thread::spawn(move || { + for _stream in listener.incoming() { + std::thread::sleep(Duration::from_secs(2)); + } + }); + + let client = Client::from_config( + &format!("127.0.0.1:{}", port), + crate::config::ConfigBuilder::new().retry(3).build(), + ) + .unwrap(); + let msg = |n| format!("error #{}", n); + + let mut n = 0; + let res: Result<(), _> = impl_inner_call_impl(&client, || { + n = n + 1; + Err(Error::Message(msg(n))) + }); + assert_eq!(n, 4); + + let err = res.unwrap_err(); + let Error::AllAttemptsErrored(vec) = err else { + panic!() + }; + assert_eq!(vec.len(), n); + for (i, err) in vec.into_iter().enumerate() { + let Error::Message(m) = err else { panic!() }; + assert_eq!(m, msg(i + 1)); + } + } }