Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/cont_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ jobs:
- default
- blocking
- blocking-https
- blocking-https-rustls
- blocking-https-native
- blocking-https-bundled
- blocking-https-rustls
- blocking-https-rustls-probe
- async
- async-https
- async-https-native
- async-https-rustls
- async-https-rustls-manual-roots
- async-https-rustls-probe
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
25 changes: 12 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ serde_json = { version = "1.0", default-features = false }
bitcoin = { version = "0.32", features = ["serde", "std"], default-features = false }
hex = { version = "0.2", package = "hex-conservative" }
log = "^0.4"
minreq = { version = "2.11.0", features = ["json-using-serde"], optional = true }
reqwest = { version = "0.12", features = ["json"], default-features = false, optional = true }
bitreq = { version = "0.3.4", optional = true }

# default async runtime
tokio = { version = "1", features = ["time"], optional = true }
Expand All @@ -37,16 +36,16 @@ electrsd = { version = "0.36.1", features = ["legacy", "esplora_a33e97e1", "core
lazy_static = "1.4.0"

[features]
default = ["blocking", "async", "async-https", "tokio"]
blocking = ["minreq", "minreq/proxy"]
blocking-https = ["blocking", "minreq/https"]
blocking-https-rustls = ["blocking", "minreq/https-rustls"]
blocking-https-native = ["blocking", "minreq/https-native"]
blocking-https-bundled = ["blocking", "minreq/https-bundled"]
default = ["blocking", "blocking-https", "async", "async-https", "tokio"]
blocking = ["bitreq/proxy", "bitreq/json-using-serde"]
blocking-https = ["blocking", "bitreq/https"]
blocking-https-native = ["blocking", "bitreq/https-native-tls"]
blocking-https-rustls = ["blocking", "bitreq/https-rustls"]
blocking-https-rustls-probe = ["blocking", "bitreq/https-rustls-probe"]

tokio = ["dep:tokio"]
async = ["reqwest", "reqwest/socks", "tokio?/time"]
async-https = ["async", "reqwest/default-tls"]
async-https-native = ["async", "reqwest/native-tls"]
async-https-rustls = ["async", "reqwest/rustls-tls"]
async-https-rustls-manual-roots = ["async", "reqwest/rustls-tls-manual-roots"]
async = ["bitreq/async", "bitreq/proxy", "bitreq/json-using-serde", "tokio?/time"]
async-https = ["async", "bitreq/async-https"]
async-https-native = ["async", "bitreq/async-https-native-tls"]
async-https-rustls = ["async", "bitreq/async-https-rustls"]
async-https-rustls-probe = ["async", "bitreq/async-https-rustls-probe"]
4 changes: 1 addition & 3 deletions ci/pin-msrv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@ set -euo pipefail

# Pin dependencies for MSRV (1.75.0)
cargo update -p minreq --precise "2.13.2"
cargo update -p idna_adapter --precise "1.2.0"
cargo update -p native-tls --precise "0.2.13"
cargo update -p zerofrom --precise "0.1.5"
cargo update -p litemap --precise "0.7.4"
cargo update -p getrandom@0.4.1 --precise "0.3.4"
182 changes: 99 additions & 83 deletions src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use bitcoin::hashes::{sha256, Hash};
use bitcoin::hex::{DisplayHex, FromHex};
use bitcoin::{Address, Block, BlockHash, MerkleBlock, Script, Transaction, Txid};

#[allow(unused_imports)]
use log::{debug, error, info, trace};

use reqwest::{header, Body, Client, Response};
use bitreq::{Client, RequestExt, Response};

use crate::{
AddressStats, BlockInfo, BlockStatus, BlockSummary, Builder, Error, MempoolRecentTx,
Expand All @@ -35,61 +32,41 @@ use crate::{
};

/// An async client for interacting with an Esplora API server.
#[derive(Debug, Clone)]
// FIXME: (@oleonardolima) there's no `Debug` implementation for `bitreq::Client`.
#[derive(Clone)]
pub struct AsyncClient<S = DefaultSleeper> {
/// The URL of the Esplora Server.
url: String,
/// The inner [`reqwest::Client`] to make HTTP requests.
client: Client,
/// The proxy is ignored when targeting `wasm32`.
proxy: Option<String>,
/// Socket timeout.
timeout: Option<u64>,
/// HTTP headers to set on every request made to Esplora server
headers: HashMap<String, String>,
/// Number of times to retry a request
max_retries: usize,
/// The inner [`reqwest::Client`] to make HTTP requests.
client: Client,
/// Marker for the type of sleeper used
marker: PhantomData<S>,
}

impl<S: Sleeper> AsyncClient<S> {
/// Build an [`AsyncClient`] from a [`Builder`].
pub fn from_builder(builder: Builder) -> Result<Self, Error> {
let mut client_builder = Client::builder();

#[cfg(not(target_arch = "wasm32"))]
if let Some(proxy) = &builder.proxy {
client_builder = client_builder.proxy(reqwest::Proxy::all(proxy)?);
}

#[cfg(not(target_arch = "wasm32"))]
if let Some(timeout) = builder.timeout {
client_builder = client_builder.timeout(core::time::Duration::from_secs(timeout));
}

if !builder.headers.is_empty() {
let mut headers = header::HeaderMap::new();
for (k, v) in builder.headers {
let header_name = header::HeaderName::from_lowercase(k.to_lowercase().as_bytes())
.map_err(|_| Error::InvalidHttpHeaderName(k))?;
let header_value = header::HeaderValue::from_str(&v)
.map_err(|_| Error::InvalidHttpHeaderValue(v))?;
headers.insert(header_name, header_value);
}
client_builder = client_builder.default_headers(headers);
}
// TODO: (@oleonardolima) we should expose this to the final user through `Builder`.
let cached_connections = 10;
let client = Client::new(cached_connections);

Ok(AsyncClient {
url: builder.base_url,
client: client_builder.build()?,
proxy: builder.proxy,
timeout: builder.timeout,
headers: builder.headers,
max_retries: builder.max_retries,
marker: PhantomData,
})
}

/// Build an [`AsyncClient`] from a [`Client`].
pub fn from_client(url: String, client: Client) -> Self {
AsyncClient {
url,
client,
max_retries: crate::DEFAULT_MAX_RETRIES,
marker: PhantomData,
}
})
}

/// Make an HTTP GET request to given URL, deserializing to any `T` that
Expand All @@ -107,14 +84,13 @@ impl<S: Sleeper> AsyncClient<S> {
let url = format!("{}{}", self.url, path);
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
status: response.status().as_u16(),
message: response.text().await?,
});
if !is_success(&response) {
let status = u16::try_from(response.status_code).map_err(Error::StatusCode)?;
let message = response.as_str().unwrap_or_default().to_string();
return Err(Error::HttpResponse { status, message });
}

Ok(deserialize::<T>(&response.bytes().await?)?)
Ok(deserialize::<T>(response.as_bytes())?)
}

/// Make an HTTP GET request to given URL, deserializing to `Option<T>`.
Expand Down Expand Up @@ -147,14 +123,13 @@ impl<S: Sleeper> AsyncClient<S> {
let url = format!("{}{}", self.url, path);
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
status: response.status().as_u16(),
message: response.text().await?,
});
if !is_success(&response) {
let status = u16::try_from(response.status_code).map_err(Error::StatusCode)?;
let message = response.as_str().unwrap_or_default().to_string();
return Err(Error::HttpResponse { status, message });
}

response.json::<T>().await.map_err(Error::Reqwest)
response.json::<T>().map_err(Error::BitReq)
}

/// Make an HTTP GET request to given URL, deserializing to `Option<T>`.
Expand Down Expand Up @@ -189,15 +164,14 @@ impl<S: Sleeper> AsyncClient<S> {
let url = format!("{}{}", self.url, path);
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
status: response.status().as_u16(),
message: response.text().await?,
});
if !is_success(&response) {
let status = u16::try_from(response.status_code).map_err(Error::StatusCode)?;
let message = response.as_str().unwrap_or_default().to_string();
return Err(Error::HttpResponse { status, message });
}

let hex_str = response.text().await?;
Ok(deserialize(&Vec::from_hex(&hex_str)?)?)
let hex_str = response.as_str()?;
Ok(deserialize(&Vec::from_hex(hex_str)?)?)
}

/// Make an HTTP GET request to given URL, deserializing to `Option<T>`.
Expand Down Expand Up @@ -226,14 +200,13 @@ impl<S: Sleeper> AsyncClient<S> {
let url = format!("{}{}", self.url, path);
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
status: response.status().as_u16(),
message: response.text().await?,
});
if !is_success(&response) {
let status = u16::try_from(response.status_code).map_err(Error::StatusCode)?;
let message = response.as_str().unwrap_or_default().to_string();
return Err(Error::HttpResponse { status, message });
}

Ok(response.text().await?)
Ok(response.as_str()?.to_string())
}

/// Make an HTTP GET request to given URL, deserializing to `Option<T>`.
Expand All @@ -257,26 +230,25 @@ impl<S: Sleeper> AsyncClient<S> {
///
/// This function will return an error either from the HTTP client, or the
/// response's [`serde_json`] deserialization.
async fn post_request_bytes<T: Into<Body>>(
async fn post_request_bytes<T: Into<Vec<u8>>>(
&self,
path: &str,
body: T,
query_params: Option<HashSet<(&str, String)>>,
) -> Result<Response, Error> {
let url: String = format!("{}{}", self.url, path);
let mut request = self.client.post(url).body(body);
let mut request: bitreq::Request = bitreq::post(url).with_body(body);

for param in query_params.unwrap_or_default() {
request = request.query(&param);
for (key, value) in query_params.unwrap_or_default() {
request = request.with_param(key, value);
}

let response = request.send().await?;
let response = request.send_async_with_client(&self.client).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
status: response.status().as_u16(),
message: response.text().await?,
});
if !is_success(&response) {
let status = u16::try_from(response.status_code).map_err(Error::StatusCode)?;
let message = response.as_str().unwrap_or_default().to_string();
return Err(Error::HttpResponse { status, message });
}

Ok(response)
Expand Down Expand Up @@ -375,7 +347,7 @@ impl<S: Sleeper> AsyncClient<S> {
pub async fn broadcast(&self, transaction: &Transaction) -> Result<Txid, Error> {
let body = serialize::<Transaction>(transaction).to_lower_hex_string();
let response = self.post_request_bytes("/tx", body, None).await?;
let txid = Txid::from_str(&response.text().await?).map_err(Error::HexToArray)?;
let txid = Txid::from_str(response.as_str()?).map_err(Error::HexToArray)?;
Ok(txid)
}

Expand Down Expand Up @@ -414,7 +386,7 @@ impl<S: Sleeper> AsyncClient<S> {
)
.await?;

Ok(response.json::<SubmitPackageResult>().await?)
Ok(response.json::<SubmitPackageResult>()?)
}

/// Get the current height of the blockchain tip
Expand Down Expand Up @@ -606,21 +578,65 @@ impl<S: Sleeper> AsyncClient<S> {
let mut delay = BASE_BACKOFF_MILLIS;
let mut attempts = 0;

let mut request = bitreq::get(url);

#[cfg(not(target_arch = "wasm32"))]
if let Some(proxy) = &self.proxy {
use bitreq::Proxy;

let proxy = Proxy::new_http(proxy.as_str())?;
request = request.with_proxy(proxy);
}

#[cfg(not(target_arch = "wasm32"))]
if let Some(timeout) = &self.timeout {
request = request.with_timeout(*timeout);
}

if !self.headers.is_empty() {
request = request.with_headers(&self.headers);
}

loop {
match self.client.get(url).send().await? {
resp if attempts < self.max_retries && is_status_retryable(resp.status()) => {
match request.clone().send_async_with_client(&self.client).await? {
response if attempts < self.max_retries && is_retryable(&response) => {
S::sleep(delay).await;
attempts += 1;
delay *= 2;
}
resp => return Ok(resp),
response => return Ok(response),
}
}
}
}

fn is_status_retryable(status: reqwest::StatusCode) -> bool {
RETRYABLE_ERROR_CODES.contains(&status.as_u16())
// /// Check if [`Response`] status is within 100-199.
// fn is_informational(response: &Response) -> bool {
// (100..200).contains(&response.status_code)
// }

/// Check if [`Response`] status is within 200-299.
fn is_success(response: &Response) -> bool {
(200..300).contains(&response.status_code)
}

// /// Check if [`Response`] status is within 300-399.
// fn is_redirection(response: &Response) -> bool {
// (300..400).contains(&response.status_code)
// }

// /// Check if [`Response`] status is within 400-499.
// fn is_client_error(response: &Response) -> bool {
// (400..500).contains(&response.status_code)
// }

// /// Check if [`Response`] status is within 500-599.
// fn is_server_error(response: &Response) -> bool {
// (500..600).contains(&response.status_code)
// }

fn is_retryable(response: &Response) -> bool {
RETRYABLE_ERROR_CODES.contains(&(response.status_code as u16))
}

/// Sleeper trait that allows any async runtime to be used.
Expand Down
Loading
Loading