diff --git a/CHANGELOG.md b/CHANGELOG.md index 9367f78..bab699d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,14 @@ # Changelog +## 1.5.6 /2025-10-08 +* Clean Up Error Handling by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/193 +* Avoids ID of 'None' in queries by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/196 +* Allows AsyncSubstrateInterface's Websocket connection to not automatically shut down by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/197 +* return type annotation for `get_metadata_call_function` by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/199 +* Change responses["results"] to deque by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/198 +* do not attempt to reconnect if there are open subscriptions by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/200 + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.5...v1.5.6 + ## 1.5.5 /2025-10-06 * Improve timeout task cancellation by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/190 diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 2cda046..a93af0c 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -24,6 +24,7 @@ import websockets.exceptions from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string +from scalecodec import GenericVariant from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.type_registry import load_type_registry_preset from scalecodec.types import ( @@ -526,9 +527,9 @@ class Websocket: def __init__( self, ws_url: str, - max_subscriptions=1024, - max_connections=100, - shutdown_timer=5, + max_subscriptions: int = 1024, + max_connections: int = 100, + shutdown_timer: Optional[float] = 5.0, options: Optional[dict] = None, _log_raw_websockets: bool = False, retry_timeout: float = 60.0, @@ -542,7 +543,9 @@ def __init__( ws_url: Websocket URL to connect to max_subscriptions: Maximum number of subscriptions per websocket connection max_connections: Maximum number of connections total - shutdown_timer: Number of seconds to shut down websocket connection after last use + shutdown_timer: Number of seconds to shut down websocket connection after last use. If set to `None`, the + connection will never be automatically shut down. Use this for very long-running processes, where you + will manually shut down the connection if ever you intend to close it. options: Options to pass to the websocket connection _log_raw_websockets: Whether to log raw websockets in the "raw_websocket" logger retry_timeout: Timeout in seconds to retry websocket connection @@ -643,6 +646,10 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: self._attempts += 1 is_retry = True if should_reconnect is True: + if len(self._received_subscriptions) > 0: + return SubstrateRequestException( + f"Unable to reconnect because there are currently open subscriptions." + ) for original_id, payload in list(self._inflight.items()): self._received[original_id] = loop.create_future() to_send = json.loads(payload) @@ -659,17 +666,24 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: return e elif isinstance(e := send_task.result(), Exception): return e + elif len(self._received_subscriptions) > 0: + return SubstrateRequestException( + f"Currently open subscriptions while disconnecting. " + f"Ensure these are unsubscribed from before closing in the future." + ) + return None async def __aexit__(self, exc_type, exc_val, exc_tb): - if not self.state != State.CONNECTING: - if self._exit_task is not None: - self._exit_task.cancel() - try: - await self._exit_task - except asyncio.CancelledError: - pass - if self.ws is not None: - self._exit_task = asyncio.create_task(self._exit_with_timer()) + if self.shutdown_timer is not None: + if not self.state != State.CONNECTING: + if self._exit_task is not None: + self._exit_task.cancel() + try: + await self._exit_task + except asyncio.CancelledError: + pass + if self.ws is not None: + self._exit_task = asyncio.create_task(self._exit_with_timer()) async def _exit_with_timer(self): """ @@ -677,7 +691,8 @@ async def _exit_with_timer(self): for reuse of the websocket connection. """ try: - await asyncio.sleep(self.shutdown_timer) + if self.shutdown_timer is not None: + await asyncio.sleep(self.shutdown_timer) await self.shutdown() except asyncio.CancelledError: pass @@ -1407,7 +1422,8 @@ async def retrieve_pending_extrinsics(self) -> list: runtime = await self.init_runtime() result_data = await self.rpc_request("author_pendingExtrinsics", []) - + if "error" in result_data: + raise SubstrateRequestException(result_data["error"]["message"]) extrinsics = [] for extrinsic_data in result_data["result"]: @@ -2141,6 +2157,8 @@ async def get_parent_block_hash(self, block_hash) -> str: async def _get_parent_block_hash(self, block_hash) -> str: block_header = await self.rpc_request("chain_getHeader", [block_hash]) + if "error" in block_header: + raise SubstrateRequestException(block_header["error"]["message"]) if block_header["result"] is None: raise SubstrateRequestException(f'Block not found for "{block_hash}"') @@ -2172,15 +2190,7 @@ async def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any: response = await self.rpc_request( "state_getStorage", [storage_key, block_hash] ) - - if "result" in response: - return response.get("result") - elif "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - else: - raise SubstrateRequestException( - "Unknown error occurred during retrieval of events" - ) + return response.get("result") @cached_fetcher(max_size=SUBSTRATE_RUNTIME_CACHE_SIZE) async def get_block_runtime_info(self, block_hash: str) -> dict: @@ -2236,9 +2246,6 @@ async def get_block_metadata( params = [block_hash] response = await self.rpc_request("state_getMetadata", params) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - if (result := response.get("result")) and decode: metadata_decoder = runtime_config.create_scale_object( "MetadataVersioned", data=ScaleBytes(result) @@ -2304,8 +2311,13 @@ async def _preprocess( metadata=runtime.metadata, ) method = "state_getStorageAt" + queryable = ( + str(query_for) + if query_for is not None + else f"{method}{random.randint(0, 7000)}" + ) return Preprocessed( - str(query_for), + queryable, method, [storage_key.to_hex(), block_hash], value_scale_type, @@ -2553,7 +2565,7 @@ async def _get_block_hash(self, block_id: int) -> str: return (await self.rpc_request("chain_getBlockHash", [block_id]))["result"] async def get_chain_head(self) -> str: - result = await self._make_rpc_request( + response = await self._make_rpc_request( [ self.make_payload( "rpc_request", @@ -2562,8 +2574,11 @@ async def get_chain_head(self) -> str: ) ] ) - self.last_block_hash = result["rpc_request"][0]["result"] - return result["rpc_request"][0]["result"] + result = response["rpc_request"][0] + if "error" in result: + raise SubstrateRequestException(result["error"]["message"]) + self.last_block_hash = result["result"] + return result["result"] async def compose_call( self, @@ -2690,9 +2705,6 @@ async def query_multi( runtime=runtime, ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - result = [] storage_key_map = {s.to_hex(): s for s in storage_keys} @@ -3044,12 +3056,7 @@ async def get_chain_finalised_head(self): """ response = await self.rpc_request("chain_getFinalizedHead", []) - - if response is not None: - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - - return response.get("result") + return response["result"] async def _do_runtime_call_old( self, @@ -3092,6 +3099,8 @@ async def _do_runtime_call_old( [f"{api}_{method}", param_data.hex(), block_hash], runtime=runtime, ) + if "error" in result_data: + raise SubstrateRequestException(result_data["error"]["message"]) result_vec_u8_bytes = hex_to_bytes(result_data["result"]) result_bytes = await self.decode_scale( "Vec", result_vec_u8_bytes, runtime=runtime @@ -3185,6 +3194,8 @@ async def runtime_call( [f"{api}_{method}", param_data.hex(), block_hash], runtime=runtime, ) + if "error" in result_data: + raise SubstrateRequestException(result_data["error"]["message"]) output_type_string = f"scale_info::{runtime_call_def['output']}" # Decode result @@ -3237,6 +3248,8 @@ async def get_account_next_index(self, account_address: str) -> int: nonce_obj = await self.rpc_request( "account_nextIndex", [account_address] ) + if "error" in nonce_obj: + raise SubstrateRequestException(nonce_obj["error"]["message"]) self._nonces[account_address] = nonce_obj["result"] else: self._nonces[account_address] += 1 @@ -3622,9 +3635,6 @@ async def query_map( method="state_getKeys", params=[prefix, block_hash], runtime=runtime ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - result_keys = response.get("result") result = [] @@ -3640,8 +3650,6 @@ async def query_map( params=[result_keys, block_hash], runtime=runtime, ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) for result_group in response["result"]: result = decode_query_map( result_group["changes"], @@ -3680,8 +3688,6 @@ async def query_map( ) ) for response in all_responses: - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) for result_group in response["result"]: changes.extend(result_group["changes"]) @@ -3905,9 +3911,6 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]: "author_submitExtrinsic", [str(extrinsic.data)] ) - if "result" not in response: - raise SubstrateRequestException(response.get("error")) - result = AsyncExtrinsicReceipt( substrate=self, extrinsic_hash=response["result"] ) @@ -3919,10 +3922,9 @@ async def get_metadata_call_function( module_name: str, call_function_name: str, block_hash: Optional[str] = None, - ) -> Optional[list]: + ) -> Optional[GenericVariant]: """ - Retrieves a list of all call functions in metadata active for given block_hash (or chaintip if block_hash - is omitted) + Retrieves specified call from the metadata at the block specified, or the chain tip if omitted. Args: module_name: name of the module @@ -3930,7 +3932,7 @@ async def get_metadata_call_function( block_hash: optional block hash Returns: - list of call functions + The dict-like call definition, if found. None otherwise. """ runtime = await self.init_runtime(block_hash=block_hash) @@ -3994,12 +3996,8 @@ async def get_block_number(self, block_hash: Optional[str] = None) -> int: """Async version of `substrateinterface.base.get_block_number` method.""" response = await self.rpc_request("chain_getHeader", [block_hash]) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - - elif "result" in response: - if response["result"]: - return int(response["result"]["number"], 16) + if response["result"]: + return int(response["result"]["number"], 16) raise SubstrateRequestException( f"Unable to retrieve block number for {block_hash}" ) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 61efc54..1575a8e 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -13,6 +13,7 @@ GenericRuntimeCallDefinition, ss58_encode, MultiAccountId, + GenericVariant, ) from scalecodec.base import ScaleBytes, ScaleType from websockets.sync.client import connect, ClientConnection @@ -1709,8 +1710,6 @@ def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any: if "result" in response: return response.get("result") - elif "error" in response: - raise SubstrateRequestException(response["error"]["message"]) else: raise SubstrateRequestException( "Unknown error occurred during retrieval of events" @@ -1761,9 +1760,6 @@ def get_block_metadata( params = [block_hash] response = self.rpc_request("state_getMetadata", params) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - if (result := response.get("result")) and decode: metadata_decoder = self.runtime_config.create_scale_object( "MetadataVersioned", data=ScaleBytes(result) @@ -1825,8 +1821,13 @@ def _preprocess( metadata=self.runtime.metadata, ) method = "state_getStorageAt" + queryable = ( + str(query_for) + if query_for is not None + else f"{method}{random.randint(0, 7000)}" + ) return Preprocessed( - str(query_for), + queryable, method, [storage_key.to_hex(), block_hash], value_scale_type, @@ -2073,7 +2074,7 @@ def get_block_hash(self, block_id: int) -> str: return self.rpc_request("chain_getBlockHash", [block_id])["result"] def get_chain_head(self) -> str: - result = self._make_rpc_request( + response = self._make_rpc_request( [ self.make_payload( "rpc_request", @@ -2082,8 +2083,11 @@ def get_chain_head(self) -> str: ) ] ) - self.last_block_hash = result["rpc_request"][0]["result"] - return result["rpc_request"][0]["result"] + result = response["rpc_request"][0] + if "error" in result: + raise SubstrateRequestException(result["error"]["message"]) + self.last_block_hash = result["result"] + return result["result"] def compose_call( self, @@ -2191,9 +2195,6 @@ def query_multi( "state_queryStorageAt", [[s.to_hex() for s in storage_keys], block_hash] ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - result = [] storage_key_map = {s.to_hex(): s for s in storage_keys} @@ -2528,12 +2529,7 @@ def get_chain_finalised_head(self): """ response = self.rpc_request("chain_getFinalizedHead", []) - - if response is not None: - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - - return response.get("result") + return response["result"] def _do_runtime_call_old( self, @@ -3051,9 +3047,6 @@ def query_map( params=[prefix, page_size, start_key, block_hash], ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - result_keys = response.get("result") result = [] @@ -3067,9 +3060,6 @@ def query_map( method="state_queryStorageAt", params=[result_keys, block_hash] ) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - for result_group in response["result"]: result = decode_query_map( result_group["changes"], @@ -3301,10 +3291,9 @@ def get_metadata_call_function( module_name: str, call_function_name: str, block_hash: Optional[str] = None, - ) -> Optional[list]: + ) -> Optional[GenericVariant]: """ - Retrieves a list of all call functions in metadata active for given block_hash (or chaintip if block_hash - is omitted) + Retrieves specified call from the metadata at the block specified, or the chain tip if omitted. Args: module_name: name of the module @@ -3312,7 +3301,7 @@ def get_metadata_call_function( block_hash: optional block hash Returns: - list of call functions + The dict-like call definition, if found. None otherwise. """ self.init_runtime(block_hash=block_hash) @@ -3376,15 +3365,12 @@ def get_block_number(self, block_hash: Optional[str] = None) -> int: """Async version of `substrateinterface.base.get_block_number` method.""" response = self.rpc_request("chain_getHeader", [block_hash]) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - - elif "result" in response: - if response["result"]: - return int(response["result"]["number"], 16) - raise SubstrateRequestException( - f"Unable to determine block number for {block_hash}" - ) + if response["result"]: + return int(response["result"]["number"], 16) + else: + raise SubstrateRequestException( + f"Unable to determine block number for {block_hash}" + ) def close(self): """ diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index ecacca8..a5cac00 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -1,6 +1,6 @@ import logging from abc import ABC -from collections import defaultdict +from collections import defaultdict, deque from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime @@ -374,7 +374,9 @@ class RequestManager: def __init__(self, payloads): self.response_map = {} - self.responses = defaultdict(lambda: {"complete": False, "results": []}) + self.responses = defaultdict( + lambda: {"complete": False, "results": deque(maxlen=100)} + ) self.payloads_count = len(payloads) def add_request(self, item_id: str, request_id: str): diff --git a/pyproject.toml b/pyproject.toml index 451672d..2fd3b5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.5.5" +version = "1.5.6" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" }