From 3e77a3bb276d84cb88769c70f5fbe22307e20e7d Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 6 Oct 2025 20:57:52 +0200 Subject: [PATCH 1/9] Eliminates a lot of double-checking for "error" in response, and adds error checking for this for `get_chain_head` --- async_substrate_interface/async_substrate.py | 54 ++++++-------------- async_substrate_interface/sync_substrate.py | 38 ++++---------- 2 files changed, 25 insertions(+), 67 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 2cda046..b2bdd40 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -1407,7 +1407,8 @@ async def retrieve_pending_extrinsics(self) -> list: runtime = await self.init_runtime() result_data = await self.rpc_request("author_pendingExtrinsics", []) - + if "error" in result_data: + raise SubstrateRequestException(result_data["error"]["message"]) extrinsics = [] for extrinsic_data in result_data["result"]: @@ -2141,6 +2142,8 @@ async def get_parent_block_hash(self, block_hash) -> str: async def _get_parent_block_hash(self, block_hash) -> str: block_header = await self.rpc_request("chain_getHeader", [block_hash]) + if "error" in block_header: + raise SubstrateRequestException(block_header["error"]["message"]) if block_header["result"] is None: raise SubstrateRequestException(f'Block not found for "{block_hash}"') @@ -2172,15 +2175,7 @@ async def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any: response = await self.rpc_request( "state_getStorage", [storage_key, block_hash] ) - - if "result" in response: - return response.get("result") - elif "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - else: - raise SubstrateRequestException( - "Unknown error occurred during retrieval of events" - ) + return response.get("result") @cached_fetcher(max_size=SUBSTRATE_RUNTIME_CACHE_SIZE) async def get_block_runtime_info(self, block_hash: str) -> dict: @@ -2236,9 +2231,6 @@ async def get_block_metadata( params = [block_hash] response = await self.rpc_request("state_getMetadata", params) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - if (result := response.get("result")) and decode: metadata_decoder = runtime_config.create_scale_object( "MetadataVersioned", data=ScaleBytes(result) @@ -2562,6 +2554,8 @@ async def get_chain_head(self) -> str: ) ] ) + if "error" in result[0]: + raise SubstrateRequestException(result[0]["error"]["message"]) self.last_block_hash = result["rpc_request"][0]["result"] return result["rpc_request"][0]["result"] @@ -2690,9 +2684,6 @@ async def query_multi( runtime=runtime, ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - result = [] storage_key_map = {s.to_hex(): s for s in storage_keys} @@ -3044,12 +3035,7 @@ async def get_chain_finalised_head(self): """ response = await self.rpc_request("chain_getFinalizedHead", []) - - if response is not None: - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - - return response.get("result") + return response["result"] async def _do_runtime_call_old( self, @@ -3092,6 +3078,8 @@ async def _do_runtime_call_old( [f"{api}_{method}", param_data.hex(), block_hash], runtime=runtime, ) + if "error" in result_data: + raise SubstrateRequestException(result_data["error"]["message"]) result_vec_u8_bytes = hex_to_bytes(result_data["result"]) result_bytes = await self.decode_scale( "Vec", result_vec_u8_bytes, runtime=runtime @@ -3185,6 +3173,8 @@ async def runtime_call( [f"{api}_{method}", param_data.hex(), block_hash], runtime=runtime, ) + if "error" in result_data: + raise SubstrateRequestException(result_data["error"]["message"]) output_type_string = f"scale_info::{runtime_call_def['output']}" # Decode result @@ -3237,6 +3227,8 @@ async def get_account_next_index(self, account_address: str) -> int: nonce_obj = await self.rpc_request( "account_nextIndex", [account_address] ) + if "error" in nonce_obj: + raise SubstrateRequestException(nonce_obj["error"]["message"]) self._nonces[account_address] = nonce_obj["result"] else: self._nonces[account_address] += 1 @@ -3622,9 +3614,6 @@ async def query_map( method="state_getKeys", params=[prefix, block_hash], runtime=runtime ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - result_keys = response.get("result") result = [] @@ -3640,8 +3629,6 @@ async def query_map( params=[result_keys, block_hash], runtime=runtime, ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) for result_group in response["result"]: result = decode_query_map( result_group["changes"], @@ -3680,8 +3667,6 @@ async def query_map( ) ) for response in all_responses: - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) for result_group in response["result"]: changes.extend(result_group["changes"]) @@ -3905,9 +3890,6 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]: "author_submitExtrinsic", [str(extrinsic.data)] ) - if "result" not in response: - raise SubstrateRequestException(response.get("error")) - result = AsyncExtrinsicReceipt( substrate=self, extrinsic_hash=response["result"] ) @@ -3994,12 +3976,8 @@ async def get_block_number(self, block_hash: Optional[str] = None) -> int: """Async version of `substrateinterface.base.get_block_number` method.""" response = await self.rpc_request("chain_getHeader", [block_hash]) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - - elif "result" in response: - if response["result"]: - return int(response["result"]["number"], 16) + if response["result"]: + return int(response["result"]["number"], 16) raise SubstrateRequestException( f"Unable to retrieve block number for {block_hash}" ) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 61efc54..5854ba2 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -1709,8 +1709,6 @@ def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any: if "result" in response: return response.get("result") - elif "error" in response: - raise SubstrateRequestException(response["error"]["message"]) else: raise SubstrateRequestException( "Unknown error occurred during retrieval of events" @@ -1761,9 +1759,6 @@ def get_block_metadata( params = [block_hash] response = self.rpc_request("state_getMetadata", params) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - if (result := response.get("result")) and decode: metadata_decoder = self.runtime_config.create_scale_object( "MetadataVersioned", data=ScaleBytes(result) @@ -2082,6 +2077,8 @@ def get_chain_head(self) -> str: ) ] ) + if "error" in result[0]: + raise SubstrateRequestException(result[0]["error"]["message"]) self.last_block_hash = result["rpc_request"][0]["result"] return result["rpc_request"][0]["result"] @@ -2191,9 +2188,6 @@ def query_multi( "state_queryStorageAt", [[s.to_hex() for s in storage_keys], block_hash] ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - result = [] storage_key_map = {s.to_hex(): s for s in storage_keys} @@ -2528,12 +2522,7 @@ def get_chain_finalised_head(self): """ response = self.rpc_request("chain_getFinalizedHead", []) - - if response is not None: - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - - return response.get("result") + return response["result"] def _do_runtime_call_old( self, @@ -3051,9 +3040,6 @@ def query_map( params=[prefix, page_size, start_key, block_hash], ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - result_keys = response.get("result") result = [] @@ -3067,9 +3053,6 @@ def query_map( method="state_queryStorageAt", params=[result_keys, block_hash] ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - for result_group in response["result"]: result = decode_query_map( result_group["changes"], @@ -3376,15 +3359,12 @@ def get_block_number(self, block_hash: Optional[str] = None) -> int: """Async version of `substrateinterface.base.get_block_number` method.""" response = self.rpc_request("chain_getHeader", [block_hash]) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - - elif "result" in response: - if response["result"]: - return int(response["result"]["number"], 16) - raise SubstrateRequestException( - f"Unable to determine block number for {block_hash}" - ) + if response["result"]: + return int(response["result"]["number"], 16) + else: + raise SubstrateRequestException( + f"Unable to determine block number for {block_hash}" + ) def close(self): """ From 9c427d57a9280d9086356c568850865312b46bb1 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 6 Oct 2025 21:04:38 +0200 Subject: [PATCH 2/9] Oepsie --- async_substrate_interface/async_substrate.py | 11 ++++++----- async_substrate_interface/sync_substrate.py | 11 ++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index b2bdd40..3ccf8af 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -2545,7 +2545,7 @@ async def _get_block_hash(self, block_id: int) -> str: return (await self.rpc_request("chain_getBlockHash", [block_id]))["result"] async def get_chain_head(self) -> str: - result = await self._make_rpc_request( + response = await self._make_rpc_request( [ self.make_payload( "rpc_request", @@ -2554,10 +2554,11 @@ async def get_chain_head(self) -> str: ) ] ) - if "error" in result[0]: - raise SubstrateRequestException(result[0]["error"]["message"]) - self.last_block_hash = result["rpc_request"][0]["result"] - return result["rpc_request"][0]["result"] + result = response["rpc_request"][0] + if "error" in result: + raise SubstrateRequestException(result["error"]["message"]) + self.last_block_hash = result["result"] + return result["result"] async def compose_call( self, diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 5854ba2..1622d78 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -2068,7 +2068,7 @@ def get_block_hash(self, block_id: int) -> str: return self.rpc_request("chain_getBlockHash", [block_id])["result"] def get_chain_head(self) -> str: - result = self._make_rpc_request( + response = self._make_rpc_request( [ self.make_payload( "rpc_request", @@ -2077,10 +2077,11 @@ def get_chain_head(self) -> str: ) ] ) - if "error" in result[0]: - raise SubstrateRequestException(result[0]["error"]["message"]) - self.last_block_hash = result["rpc_request"][0]["result"] - return result["rpc_request"][0]["result"] + result = response["rpc_request"][0] + if "error" in result: + raise SubstrateRequestException(result["error"]["message"]) + self.last_block_hash = result["result"] + return result["result"] def compose_call( self, From 85b9a25dd02f92d4746e187e4b7d977ebc12bac8 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 8 Oct 2025 15:37:01 +0200 Subject: [PATCH 3/9] Avoids ID of 'None' in queries --- async_substrate_interface/async_substrate.py | 7 ++++++- async_substrate_interface/sync_substrate.py | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 3ccf8af..6416f66 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -2296,8 +2296,13 @@ async def _preprocess( metadata=runtime.metadata, ) method = "state_getStorageAt" + queryable = ( + str(query_for) + if query_for is not None + else f"{method}{random.randint(0, 7000)}" + ) return Preprocessed( - str(query_for), + queryable, method, [storage_key.to_hex(), block_hash], value_scale_type, diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 1622d78..bb8c393 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -1820,8 +1820,13 @@ def _preprocess( metadata=self.runtime.metadata, ) method = "state_getStorageAt" + queryable = ( + str(query_for) + if query_for is not None + else f"{method}{random.randint(0, 7000)}" + ) return Preprocessed( - str(query_for), + queryable, method, [storage_key.to_hex(), block_hash], value_scale_type, From 1c82ade9bca569f95b39327dae5a9906338d654a Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 8 Oct 2025 15:56:21 +0200 Subject: [PATCH 4/9] Allows AsyncSubstrateInterface's Websocket connection to not automatically shut down For niche use-cases such as long-running processes which should never die. --- async_substrate_interface/async_substrate.py | 32 +++++++++++--------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 6416f66..283d438 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -526,9 +526,9 @@ class Websocket: def __init__( self, ws_url: str, - max_subscriptions=1024, - max_connections=100, - shutdown_timer=5, + max_subscriptions: int = 1024, + max_connections: int = 100, + shutdown_timer: Optional[float] = 5.0, options: Optional[dict] = None, _log_raw_websockets: bool = False, retry_timeout: float = 60.0, @@ -542,7 +542,9 @@ def __init__( ws_url: Websocket URL to connect to max_subscriptions: Maximum number of subscriptions per websocket connection max_connections: Maximum number of connections total - shutdown_timer: Number of seconds to shut down websocket connection after last use + shutdown_timer: Number of seconds to shut down websocket connection after last use. If set to `None`, the + connection will never be automatically shut down. Use this for very long-running processes, where you + will manually shut down the connection if ever you intend to close it. options: Options to pass to the websocket connection _log_raw_websockets: Whether to log raw websockets in the "raw_websocket" logger retry_timeout: Timeout in seconds to retry websocket connection @@ -661,15 +663,16 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: return e async def __aexit__(self, exc_type, exc_val, exc_tb): - if not self.state != State.CONNECTING: - if self._exit_task is not None: - self._exit_task.cancel() - try: - await self._exit_task - except asyncio.CancelledError: - pass - if self.ws is not None: - self._exit_task = asyncio.create_task(self._exit_with_timer()) + if self.shutdown_timer is not None: + if not self.state != State.CONNECTING: + if self._exit_task is not None: + self._exit_task.cancel() + try: + await self._exit_task + except asyncio.CancelledError: + pass + if self.ws is not None: + self._exit_task = asyncio.create_task(self._exit_with_timer()) async def _exit_with_timer(self): """ @@ -677,7 +680,8 @@ async def _exit_with_timer(self): for reuse of the websocket connection. """ try: - await asyncio.sleep(self.shutdown_timer) + if self.shutdown_timer is not None: + await asyncio.sleep(self.shutdown_timer) await self.shutdown() except asyncio.CancelledError: pass From d47af8807833853854551b44a0621dcc60756e5b Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 8 Oct 2025 16:02:50 +0200 Subject: [PATCH 5/9] Return path --- async_substrate_interface/async_substrate.py | 1 + 1 file changed, 1 insertion(+) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 283d438..e004ac7 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -661,6 +661,7 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: return e elif isinstance(e := send_task.result(), Exception): return e + return None async def __aexit__(self, exc_type, exc_val, exc_tb): if self.shutdown_timer is not None: From fac790ef2f4df2df0c8567efde1c5a5a0a1c4223 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 8 Oct 2025 16:54:14 +0200 Subject: [PATCH 6/9] Changes responses["results"] to a deque with a max length of 100 to prevent long-running processes (such as block handlers) from continuously increasing the memory usage. --- async_substrate_interface/types.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index ecacca8..a5cac00 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -1,6 +1,6 @@ import logging from abc import ABC -from collections import defaultdict +from collections import defaultdict, deque from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime @@ -374,7 +374,9 @@ class RequestManager: def __init__(self, payloads): self.response_map = {} - self.responses = defaultdict(lambda: {"complete": False, "results": []}) + self.responses = defaultdict( + lambda: {"complete": False, "results": deque(maxlen=100)} + ) self.payloads_count = len(payloads) def add_request(self, item_id: str, request_id: str): From d0334551a8c473e4ea716794f055a1f4da29069e Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 8 Oct 2025 17:33:27 +0200 Subject: [PATCH 7/9] Fixes the return type annotation (and docstring) for `get_metadata_call_function` of both SubstrateInterface and AsyncSubstrateInterface --- async_substrate_interface/async_substrate.py | 8 ++++---- async_substrate_interface/sync_substrate.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index e004ac7..99ce28b 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -24,6 +24,7 @@ import websockets.exceptions from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string +from scalecodec import GenericVariant from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.type_registry import load_type_registry_preset from scalecodec.types import ( @@ -3912,10 +3913,9 @@ async def get_metadata_call_function( module_name: str, call_function_name: str, block_hash: Optional[str] = None, - ) -> Optional[list]: + ) -> Optional[GenericVariant]: """ - Retrieves a list of all call functions in metadata active for given block_hash (or chaintip if block_hash - is omitted) + Retrieves specified call from the metadata at the block specified, or the chain tip if omitted. Args: module_name: name of the module @@ -3923,7 +3923,7 @@ async def get_metadata_call_function( block_hash: optional block hash Returns: - list of call functions + The dict-like call definition, if found. None otherwise. """ runtime = await self.init_runtime(block_hash=block_hash) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index bb8c393..1575a8e 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -13,6 +13,7 @@ GenericRuntimeCallDefinition, ss58_encode, MultiAccountId, + GenericVariant, ) from scalecodec.base import ScaleBytes, ScaleType from websockets.sync.client import connect, ClientConnection @@ -3290,10 +3291,9 @@ def get_metadata_call_function( module_name: str, call_function_name: str, block_hash: Optional[str] = None, - ) -> Optional[list]: + ) -> Optional[GenericVariant]: """ - Retrieves a list of all call functions in metadata active for given block_hash (or chaintip if block_hash - is omitted) + Retrieves specified call from the metadata at the block specified, or the chain tip if omitted. Args: module_name: name of the module @@ -3301,7 +3301,7 @@ def get_metadata_call_function( block_hash: optional block hash Returns: - list of call functions + The dict-like call definition, if found. None otherwise. """ self.init_runtime(block_hash=block_hash) From fb729d29439d791fef13c1e5e9556b4882a71109 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 8 Oct 2025 20:28:58 +0200 Subject: [PATCH 8/9] Ensures that we do not attempt to reconnect if there are open subscriptions. --- async_substrate_interface/async_substrate.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 99ce28b..a93af0c 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -646,6 +646,10 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: self._attempts += 1 is_retry = True if should_reconnect is True: + if len(self._received_subscriptions) > 0: + return SubstrateRequestException( + f"Unable to reconnect because there are currently open subscriptions." + ) for original_id, payload in list(self._inflight.items()): self._received[original_id] = loop.create_future() to_send = json.loads(payload) @@ -662,6 +666,11 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: return e elif isinstance(e := send_task.result(), Exception): return e + elif len(self._received_subscriptions) > 0: + return SubstrateRequestException( + f"Currently open subscriptions while disconnecting. " + f"Ensure these are unsubscribed from before closing in the future." + ) return None async def __aexit__(self, exc_type, exc_val, exc_tb): From eebdb85e3336b11cf5622014d80c7b6daeb36f73 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 8 Oct 2025 21:26:11 +0200 Subject: [PATCH 9/9] version + changelog --- CHANGELOG.md | 10 ++++++++++ pyproject.toml | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9367f78..bab699d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,14 @@ # Changelog +## 1.5.6 /2025-10-08 +* Clean Up Error Handling by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/193 +* Avoids ID of 'None' in queries by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/196 +* Allows AsyncSubstrateInterface's Websocket connection to not automatically shut down by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/197 +* return type annotation for `get_metadata_call_function` by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/199 +* Change responses["results"] to deque by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/198 +* do not attempt to reconnect if there are open subscriptions by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/200 + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.5...v1.5.6 + ## 1.5.5 /2025-10-06 * Improve timeout task cancellation by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/190 diff --git a/pyproject.toml b/pyproject.toml index 451672d..2fd3b5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.5.5" +version = "1.5.6" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" }