From 136ca771c83fa4999f414eff54f96bfc4a346547 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 12 Aug 2025 21:52:59 +0200 Subject: [PATCH 1/5] WIP check-in --- async_substrate_interface/utils/cache.py | 81 +++++++++++++++++++++--- pyproject.toml | 3 +- 2 files changed, 73 insertions(+), 11 deletions(-) diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 56af640..13318de 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -9,6 +9,9 @@ from pathlib import Path from typing import Callable, Any, Awaitable, Hashable, Optional +import aiosqlite + + USE_CACHE = True if os.getenv("NO_CACHE") != "1" else False CACHE_LOCATION = ( os.path.expanduser( @@ -21,6 +24,70 @@ logger = logging.getLogger("async_substrate_interface") +class AsyncSqliteDB: + _instances: dict[str, "AsyncSqliteDB"] = {} + _db: Optional[aiosqlite.Connection] = None + + def __new__(cls, chain_endpoint: str): + try: + return cls._instances[chain_endpoint] + except KeyError: + instance = super().__new__(cls) + cls._instances[chain_endpoint] = instance + return instance + + async def __call__(self, chain, func, args, kwargs) -> Optional[Any]: + if not self._db: + _ensure_dir() + self._db = await aiosqlite.connect(CACHE_LOCATION) + table_name = _get_table_name(func) + key = None + if not (local_chain := _check_if_local(chain)) or not USE_CACHE: + await self._db.execute( + f"""CREATE TABLE IF NOT EXISTS {table_name} + ( + rowid INTEGER PRIMARY KEY AUTOINCREMENT, + key BLOB, + value BLOB, + chain TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + """ + ) + await self._db.execute( + f"""CREATE TRIGGER IF NOT EXISTS prune_rows_trigger AFTER INSERT ON {table_name} + BEGIN + DELETE FROM {table_name} + WHERE rowid IN ( + SELECT rowid FROM {table_name} + ORDER BY created_at DESC + LIMIT -1 OFFSET 500 + ); + END;""" + ) + key = pickle.dumps((args, kwargs)) + try: + cursor: aiosqlite.Cursor = await self._db.execute( + f"SELECT value FROM {table_name} WHERE key=? AND chain=?", + (key, chain), + ) + result = await cursor.fetchone() + if result is not None: + return pickle.loads(result[0]) + except (pickle.PickleError, sqlite3.Error) as e: + logger.exception("Cache error", exc_info=e) + pass + + result = await func(*args, **kwargs) + if not local_chain or not USE_CACHE: + # TODO use a task here + await self._db.execute( + f"INSERT OR REPLACE INTO {table_name} (key, value, chain) VALUES (?,?,?)", + (key, pickle.dumps(result), chain), + ) + return result + + def _ensure_dir(): path = Path(CACHE_LOCATION).parent if not path.exists(): @@ -115,7 +182,8 @@ def inner(self, *args, **kwargs): ) # If not in DB, call func and store in DB - result = func(self, *args, **kwargs) + if result is None: + result = func(self, *args, **kwargs) if not local_chain or not USE_CACHE: _insert_into_cache(c, conn, table_name, key, result, chain) @@ -131,15 +199,8 @@ def async_sql_lru_cache(maxsize: Optional[int] = None): def decorator(func): @cached_fetcher(max_size=maxsize) async def inner(self, *args, **kwargs): - c, conn, table_name, key, result, chain, local_chain = ( - _shared_inner_fn_logic(func, self, args, kwargs) - ) - - # If not in DB, call func and store in DB - result = await func(self, *args, **kwargs) - if not local_chain or not USE_CACHE: - _insert_into_cache(c, conn, table_name, key, result, chain) - + async_sql_db = AsyncSqliteDB(self.url) + result = await async_sql_db(self.url, func, args, kwargs) return result return inner diff --git a/pyproject.toml b/pyproject.toml index 148b33a..02b7791 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,8 @@ dependencies = [ "bt-decode==v0.6.0", "scalecodec~=1.2.11", "websockets>=14.1", - "xxhash" + "xxhash", + "aiosqlite>=0.21.0,<1.0.0" ] requires-python = ">=3.9,<3.14" From ec825466b4e7369d9fd33a14364883d060654993 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 12 Aug 2025 22:39:32 +0200 Subject: [PATCH 2/5] Ensure closing of sqlite DB connection when closing DiskCachedAsyncSubstrateInterface --- async_substrate_interface/async_substrate.py | 13 +++++++++++++ async_substrate_interface/utils/cache.py | 12 +++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 20af1bb..f4f401e 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -63,6 +63,7 @@ from async_substrate_interface.utils.cache import ( async_sql_lru_cache, cached_fetcher, + AsyncSqliteDB, ) from async_substrate_interface.utils.decoding import ( _determine_if_old_runtime_call, @@ -4026,6 +4027,18 @@ class DiskCachedAsyncSubstrateInterface(AsyncSubstrateInterface): Experimental new class that uses disk-caching in addition to memory-caching for the cached methods """ + async def close(self): + """ + Closes the substrate connection, and the websocket connection. + """ + try: + await self.ws.shutdown() + except AttributeError: + pass + db_conn = AsyncSqliteDB(self.url) + if db_conn._db is not None: + await db_conn._db.close() + @async_sql_lru_cache(maxsize=SUBSTRATE_CACHE_METHOD_SIZE) async def get_parent_block_hash(self, block_hash): return await self._get_parent_block_hash(block_hash) diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 13318de..95a3033 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -36,7 +36,7 @@ def __new__(cls, chain_endpoint: str): cls._instances[chain_endpoint] = instance return instance - async def __call__(self, chain, func, args, kwargs) -> Optional[Any]: + async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]: if not self._db: _ensure_dir() self._db = await aiosqlite.connect(CACHE_LOCATION) @@ -65,26 +65,28 @@ async def __call__(self, chain, func, args, kwargs) -> Optional[Any]: ); END;""" ) - key = pickle.dumps((args, kwargs)) + await self._db.commit() + key = pickle.dumps((args, kwargs or None)) try: cursor: aiosqlite.Cursor = await self._db.execute( f"SELECT value FROM {table_name} WHERE key=? AND chain=?", (key, chain), ) result = await cursor.fetchone() + await cursor.close() if result is not None: return pickle.loads(result[0]) except (pickle.PickleError, sqlite3.Error) as e: logger.exception("Cache error", exc_info=e) pass - - result = await func(*args, **kwargs) + result = await func(other_self, *args, **kwargs) if not local_chain or not USE_CACHE: # TODO use a task here await self._db.execute( f"INSERT OR REPLACE INTO {table_name} (key, value, chain) VALUES (?,?,?)", (key, pickle.dumps(result), chain), ) + await self._db.commit() return result @@ -200,7 +202,7 @@ def decorator(func): @cached_fetcher(max_size=maxsize) async def inner(self, *args, **kwargs): async_sql_db = AsyncSqliteDB(self.url) - result = await async_sql_db(self.url, func, args, kwargs) + result = await async_sql_db(self.url, self, func, args, kwargs) return result return inner From e33a0ebe16867b6190425c3c1ec210724dafe6af Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 13 Aug 2025 20:00:26 +0200 Subject: [PATCH 3/5] PR comments --- async_substrate_interface/utils/cache.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 95a3033..5cf1fe4 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -27,24 +27,28 @@ class AsyncSqliteDB: _instances: dict[str, "AsyncSqliteDB"] = {} _db: Optional[aiosqlite.Connection] = None + _lock: Optional[asyncio.Lock] = None def __new__(cls, chain_endpoint: str): try: return cls._instances[chain_endpoint] except KeyError: instance = super().__new__(cls) + instance._lock = asyncio.Lock() cls._instances[chain_endpoint] = instance return instance async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]: - if not self._db: - _ensure_dir() - self._db = await aiosqlite.connect(CACHE_LOCATION) + async with self._lock: + if not self._db: + _ensure_dir() + self._db = await aiosqlite.connect(CACHE_LOCATION) table_name = _get_table_name(func) key = None if not (local_chain := _check_if_local(chain)) or not USE_CACHE: await self._db.execute( - f"""CREATE TABLE IF NOT EXISTS {table_name} + f""" + CREATE TABLE IF NOT EXISTS {table_name} ( rowid INTEGER PRIMARY KEY AUTOINCREMENT, key BLOB, @@ -52,10 +56,11 @@ async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any] chain TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); - """ + """ ) await self._db.execute( - f"""CREATE TRIGGER IF NOT EXISTS prune_rows_trigger AFTER INSERT ON {table_name} + f""" + CREATE TRIGGER IF NOT EXISTS prune_rows_trigger_{table_name} AFTER INSERT ON {table_name} BEGIN DELETE FROM {table_name} WHERE rowid IN ( @@ -63,7 +68,8 @@ async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any] ORDER BY created_at DESC LIMIT -1 OFFSET 500 ); - END;""" + END; + """ ) await self._db.commit() key = pickle.dumps((args, kwargs or None)) From 4da8bded743e850502a1ad31988d09874eec2423 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 13 Aug 2025 20:07:49 +0200 Subject: [PATCH 4/5] no op From 958dd2e033cd83acfe2b1c766ae36e9e6f2c1fc7 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 13 Aug 2025 20:32:31 +0200 Subject: [PATCH 5/5] List out the env vars used by this lib --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 4c662a7..17960c4 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,14 @@ the same on-disk cache. As with the other two caches, this also takes `SUBSTRATE_CACHE_METHOD_SIZE` and `SUBSTRATE_RUNTIME_CACHE_SIZE` env vars. +### ENV VARS +The following environment variables are used within async-substrate-interface + - NO_CACHE (default 0): if set to 1, when using the DiskCachedAsyncSubstrateInterface class, no persistent on-disk cache will be stored, instead using only in-memory cache. + - CACHE_LOCATION (default `~/.cache/async-substrate-interface`): this determines the location for the cache file, if using DiskCachedAsyncSubstrateInterface + - SUBSTRATE_CACHE_METHOD_SIZE (default 512): the cache size (either in-memory or on-disk) of the smaller return-size methods (see the Caching section for more info) + - SUBSTRATE_RUNTIME_CACHE_SIZE (default 16): the cache size (either in-memory or on-disk) of the larger return-size methods (see the Caching section for more info) + + ## Contributing Contributions are welcome! Please open an issue or submit a pull request to the `staging` branch.