From ea08a69a1ba3330de0f4da3b77d98abbc265b334 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 27 Oct 2025 23:09:25 +0200 Subject: [PATCH 01/11] WIP --- async_substrate_interface/async_substrate.py | 12 ++- async_substrate_interface/types.py | 43 ++++++++++ async_substrate_interface/utils/cache.py | 90 ++++++++++++++++---- 3 files changed, 126 insertions(+), 19 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index c2342e8..a4e0e94 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -1091,7 +1091,10 @@ async def __aenter__(self): await self.initialize() return self - async def initialize(self): + async def initialize(self) -> None: + await self._initialize() + + async def _initialize(self) -> None: """ Initialize the connection to the chain. """ @@ -1116,7 +1119,7 @@ async def initialize(self): self._initializing = False async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.ws.shutdown() + await self.close() @property def metadata(self): @@ -4260,11 +4263,16 @@ class DiskCachedAsyncSubstrateInterface(AsyncSubstrateInterface): Experimental new class that uses disk-caching in addition to memory-caching for the cached methods """ + async def initialize(self) -> None: + await self.runtime_cache.load_from_disk(self.url) + await self._initialize() + async def close(self): """ Closes the substrate connection, and the websocket connection. """ try: + await self.runtime_cache.dump_to_disk(self.url) await self.ws.shutdown() except AttributeError: pass diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index 008dd48..3e63852 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -16,6 +16,7 @@ from .const import SS58_FORMAT from .utils import json +from .utils.cache import AsyncSqliteDB logger = logging.getLogger("async_substrate_interface") @@ -101,6 +102,21 @@ def retrieve( return runtime return None + async def load_from_disk(self, chain_endpoint: str): + db = AsyncSqliteDB(chain_endpoint=chain_endpoint) + block_mapping, block_hash_mapping, runtime_version_mapping = await db.load_runtime_cache(chain_endpoint) + if not any([block_mapping, block_hash_mapping, runtime_version_mapping]): + logger.debug("No runtime mappings in disk cache") + else: + logger.debug("Found runtime mappings in disk cache") + self.blocks = {x: Runtime.deserialize(y) for x, y in block_mapping.items()} + self.block_hashes = {x: Runtime.deserialize(y) for x, y in block_hash_mapping.items()} + self.versions = {x: Runtime.deserialize(y) for x, y in runtime_version_mapping.items()} + + async def dump_to_disk(self, chain_endpoint: str): + db = AsyncSqliteDB(chain_endpoint=chain_endpoint) + await db.dump_runtime_cache(chain_endpoint, self.blocks, self.block_hashes, self.versions) + class Runtime: """ @@ -149,6 +165,33 @@ def __init__( if registry is not None: self.load_registry_type_map() + def serialize(self): + return { + "chain": self.chain, + "type_registry": self.type_registry, + "metadata": self.metadata, + "metadata_v15": self.metadata_v15.to_json() if self.metadata_v15 is not None else None, + "runtime_info": self.runtime_info, + "registry": None, # gets loaded from metadata_v15 + "ss58_format": self.ss58_format, + "runtime_config": self.runtime_config, + } + + @classmethod + def deserialize(cls, serialized: dict) -> "Runtime": + mdv15 = MetadataV15 + registry = PortableRegistry.from_metadata_v15(mdv15) if (mdv15 := serialized["metadata_v15"]) else None + return cls( + chain=serialized["chain"], + metadata=serialized["metadata"], + type_registry=serialized["type_registry"], + runtime_config=serialized["runtime_config"], + metadata_v15=mdv15 if mdv15 is not None else None, + registry=registry, + ss58_format=serialized["ss58_format"], + runtime_info=serialized["runtime_info"] + ) + def load_runtime(self): """ Initial loading of the runtime's type registry information. diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 5cf1fe4..3f15e89 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -4,12 +4,12 @@ import functools import logging import os -import pickle import sqlite3 from pathlib import Path from typing import Callable, Any, Awaitable, Hashable, Optional import aiosqlite +import dill as pickle USE_CACHE = True if os.getenv("NO_CACHE") != "1" else False @@ -38,13 +38,11 @@ def __new__(cls, chain_endpoint: str): cls._instances[chain_endpoint] = instance return instance - async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]: + async def _create_if_not_exists(self, chain: str, table_name: str): async with self._lock: if not self._db: _ensure_dir() self._db = await aiosqlite.connect(CACHE_LOCATION) - table_name = _get_table_name(func) - key = None if not (local_chain := _check_if_local(chain)) or not USE_CACHE: await self._db.execute( f""" @@ -72,19 +70,24 @@ async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any] """ ) await self._db.commit() - key = pickle.dumps((args, kwargs or None)) - try: - cursor: aiosqlite.Cursor = await self._db.execute( - f"SELECT value FROM {table_name} WHERE key=? AND chain=?", - (key, chain), - ) - result = await cursor.fetchone() - await cursor.close() - if result is not None: - return pickle.loads(result[0]) - except (pickle.PickleError, sqlite3.Error) as e: - logger.exception("Cache error", exc_info=e) - pass + return local_chain + + async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]: + table_name = _get_table_name(func) + local_chain = await self._create_if_not_exists(chain, table_name) + key = pickle.dumps((args, kwargs or None)) + try: + cursor: aiosqlite.Cursor = await self._db.execute( + f"SELECT value FROM {table_name} WHERE key=? AND chain=?", + (key, chain), + ) + result = await cursor.fetchone() + await cursor.close() + if result is not None: + return pickle.loads(result[0]) + except (pickle.PickleError, sqlite3.Error) as e: + logger.exception("Cache error", exc_info=e) + pass result = await func(other_self, *args, **kwargs) if not local_chain or not USE_CACHE: # TODO use a task here @@ -95,6 +98,59 @@ async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any] await self._db.commit() return result + async def load_runtime_cache(self, chain: str) -> tuple[dict, dict, dict]: + block_mapping = {} + block_hash_mapping = {} + version_mapping = {} + tables = { + "rt_cache_block": block_mapping, + "rt_cache_block_hash": block_hash_mapping, + "rt_cache_version": version_mapping + } + for table in tables.keys(): + local_chain = await self._create_if_not_exists(chain, table) + if local_chain: + return {}, {}, {} + for table_name, mapping in tables.items(): + try: + cursor: aiosqlite.Cursor = await self._db.execute( + f"SELECT key, value FROM {table_name} WHERE chain=?", + (chain,), + ) + results = await cursor.fetchall() + await cursor.close() + if results is None: + continue + for row in results: + key, value = row + runtime = pickle.loads(value) + mapping[key] = runtime + except (pickle.PickleError, sqlite3.Error) as e: + logger.exception("Cache error", exc_info=e) + return {}, {}, {} + return block_mapping, block_hash_mapping, version_mapping + + async def dump_runtime_cache(self, chain: str, block_mapping: dict, block_hash_mapping: dict, version_mapping: dict) -> None: + async with self._lock: + if not self._db: + _ensure_dir() + self._db = await aiosqlite.connect(CACHE_LOCATION) + tables = { + "rt_cache_block": block_mapping, + "rt_cache_block_hash": block_hash_mapping, + "rt_cache_version": version_mapping + } + for table, mapping in tables.items(): + local_chain = await self._create_if_not_exists(chain, table) + if local_chain: + return None + await self._db.executemany( + f"INSERT OR REPLACE INTO {table} (key, value, chain) VALUES (?,?,?)", + [(key, pickle.dumps(runtime.serialize()), chain) for key, runtime in mapping.items()], + ) + await self._db.commit() + return None + def _ensure_dir(): path = Path(CACHE_LOCATION).parent From c4fcacad9589ea282e82f0e845bea298e577e8f9 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 28 Oct 2025 15:21:20 +0200 Subject: [PATCH 02/11] Cleaner runtime cache update --- async_substrate_interface/types.py | 45 ++++++++++++++---------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index 3e63852..e4c0b2c 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -2,6 +2,7 @@ from abc import ABC from collections import defaultdict, deque from collections.abc import Iterable +from contextlib import suppress from dataclasses import dataclass from datetime import datetime from typing import Optional, Union, Any @@ -35,8 +36,8 @@ class RuntimeCache: is important you are utilizing the correct version. """ - blocks: dict[int, "Runtime"] - block_hashes: dict[str, "Runtime"] + blocks: dict[int, str] + block_hashes: dict[str, int] versions: dict[int, "Runtime"] last_used: Optional["Runtime"] @@ -57,10 +58,10 @@ def add_item( Adds a Runtime object to the cache mapped to its version, block number, and/or block hash. """ self.last_used = runtime - if block is not None: - self.blocks[block] = runtime - if block_hash is not None: - self.block_hashes[block_hash] = runtime + if block is not None and block_hash is not None: + self.blocks[block] = block_hash + if block_hash is not None and runtime_version is not None: + self.block_hashes[block_hash] = runtime_version if runtime_version is not None: self.versions[runtime_version] = runtime @@ -74,33 +75,29 @@ def retrieve( Retrieves a Runtime object from the cache, using the key of its block number, block hash, or runtime version. Retrieval happens in this order. If no Runtime is found mapped to any of your supplied keys, returns `None`. """ + runtime = None if block is not None: - runtime = self.blocks.get(block) - if runtime is not None: - if block_hash is not None: - # if lookup occurs for block_hash and block, but only block matches, also map to block_hash - self.add_item(runtime, block_hash=block_hash) + if block_hash is not None: + self.blocks[block] = block_hash + if runtime_version is not None: + self.block_hashes[block_hash] = runtime_version + with suppress(KeyError): + runtime = self.versions[self.block_hashes[self.blocks[block]]] self.last_used = runtime return runtime if block_hash is not None: - runtime = self.block_hashes.get(block_hash) - if runtime is not None: - if block is not None: - # if lookup occurs for block_hash and block, but only block_hash matches, also map to block - self.add_item(runtime, block=block) + if runtime_version is not None: + self.block_hashes[block_hash] = runtime_version + with suppress(KeyError): + runtime = self.versions[self.block_hashes[block_hash]] self.last_used = runtime return runtime if runtime_version is not None: - runtime = self.versions.get(runtime_version) - if runtime is not None: - # if runtime_version matches, also map to block and block_hash (if supplied) - if block is not None: - self.add_item(runtime, block=block) - if block_hash is not None: - self.add_item(runtime, block_hash=block_hash) + with suppress(KeyError): + runtime = self.versions[runtime_version] self.last_used = runtime return runtime - return None + return runtime async def load_from_disk(self, chain_endpoint: str): db = AsyncSqliteDB(chain_endpoint=chain_endpoint) From c7a93a57797e91c75fefe6b9188d2fc2a3c63c9b Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 28 Oct 2025 16:31:32 +0200 Subject: [PATCH 03/11] Good stopping point until bt-decode is updated. --- async_substrate_interface/async_substrate.py | 1 - async_substrate_interface/utils/cache.py | 91 +++++++++++++------- 2 files changed, 62 insertions(+), 30 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index a4e0e94..5e5ac03 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -2402,7 +2402,6 @@ async def get_block_metadata( "MetadataVersioned", data=ScaleBytes(result) ) metadata_decoder.decode() - return metadata_decoder else: return result diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 3f15e89..e55a9d6 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -38,11 +38,13 @@ def __new__(cls, chain_endpoint: str): cls._instances[chain_endpoint] = instance return instance - async def _create_if_not_exists(self, chain: str, table_name: str): + async def close(self): async with self._lock: - if not self._db: - _ensure_dir() - self._db = await aiosqlite.connect(CACHE_LOCATION) + if self._db: + print(44) + await self._db.close() + + async def _create_if_not_exists(self, chain: str, table_name: str): if not (local_chain := _check_if_local(chain)) or not USE_CACHE: await self._db.execute( f""" @@ -73,6 +75,10 @@ async def _create_if_not_exists(self, chain: str, table_name: str): return local_chain async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]: + async with self._lock: + if not self._db: + _ensure_dir() + self._db = await aiosqlite.connect(CACHE_LOCATION) table_name = _get_table_name(func) local_chain = await self._create_if_not_exists(chain, table_name) key = pickle.dumps((args, kwargs or None)) @@ -99,26 +105,32 @@ async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any] return result async def load_runtime_cache(self, chain: str) -> tuple[dict, dict, dict]: + async with self._lock: + if not self._db: + _ensure_dir() + self._db = await aiosqlite.connect(CACHE_LOCATION) block_mapping = {} block_hash_mapping = {} version_mapping = {} tables = { - "rt_cache_block": block_mapping, - "rt_cache_block_hash": block_hash_mapping, - "rt_cache_version": version_mapping + "RuntimeCache_blocks": block_mapping, + "RuntimeCache_block_hashes": block_hash_mapping, + "RuntimeCache_versions": version_mapping, } for table in tables.keys(): - local_chain = await self._create_if_not_exists(chain, table) + async with self._lock: + local_chain = await self._create_if_not_exists(chain, table) if local_chain: return {}, {}, {} for table_name, mapping in tables.items(): try: - cursor: aiosqlite.Cursor = await self._db.execute( - f"SELECT key, value FROM {table_name} WHERE chain=?", - (chain,), - ) - results = await cursor.fetchall() - await cursor.close() + async with self._lock: + cursor: aiosqlite.Cursor = await self._db.execute( + f"SELECT key, value FROM {table_name} WHERE chain=?", + (chain,), + ) + results = await cursor.fetchall() + await cursor.close() if results is None: continue for row in results: @@ -130,26 +142,47 @@ async def load_runtime_cache(self, chain: str) -> tuple[dict, dict, dict]: return {}, {}, {} return block_mapping, block_hash_mapping, version_mapping - async def dump_runtime_cache(self, chain: str, block_mapping: dict, block_hash_mapping: dict, version_mapping: dict) -> None: + async def dump_runtime_cache( + self, + chain: str, + block_mapping: dict, + block_hash_mapping: dict, + version_mapping: dict, + ) -> None: async with self._lock: if not self._db: _ensure_dir() self._db = await aiosqlite.connect(CACHE_LOCATION) - tables = { - "rt_cache_block": block_mapping, - "rt_cache_block_hash": block_hash_mapping, - "rt_cache_version": version_mapping - } - for table, mapping in tables.items(): - local_chain = await self._create_if_not_exists(chain, table) - if local_chain: - return None - await self._db.executemany( - f"INSERT OR REPLACE INTO {table} (key, value, chain) VALUES (?,?,?)", - [(key, pickle.dumps(runtime.serialize()), chain) for key, runtime in mapping.items()], - ) + + tables = { + "RuntimeCache_blocks": block_mapping, + "RuntimeCache_block_hashes": block_hash_mapping, + "RuntimeCache_versions": version_mapping, + } + for table, mapping in tables.items(): + local_chain = await self._create_if_not_exists(chain, table) + if local_chain: + return None + + for key, value in mapping.items(): + if not isinstance(value, (str, int)): + serialized_runtime = pickle.dumps(value.serialize()) + else: + serialized_runtime = pickle.dumps(value) + + await self._db.execute( + f"INSERT OR REPLACE INTO {table} (key, value, chain) VALUES (?,?,?)", + (key, serialized_runtime, chain), + ) + + # await self._db.executemany( + # f"INSERT OR REPLACE INTO {table} (key, value, chain) VALUES (?,?,?)", + # [(key, pickle.dumps(runtime.serialize()), chain) for key, runtime in mapping.items()], + # ) + await self._db.commit() - return None + + return None def _ensure_dir(): From 321c8273118b7042dae7de7fee3b7d19a58cd899 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 28 Oct 2025 17:09:30 +0200 Subject: [PATCH 04/11] Good stopping point until bt-decode is updated. --- async_substrate_interface/async_substrate.py | 6 +-- async_substrate_interface/types.py | 52 +++++++++++++------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 5e5ac03..ffbf5c9 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -4272,12 +4272,12 @@ async def close(self): """ try: await self.runtime_cache.dump_to_disk(self.url) + print(4276) await self.ws.shutdown() except AttributeError: pass - db_conn = AsyncSqliteDB(self.url) - if db_conn._db is not None: - await db_conn._db.close() + for db in AsyncSqliteDB._instances.values(): + await db.close() @async_sql_lru_cache(maxsize=SUBSTRATE_CACHE_METHOD_SIZE) async def get_parent_block_hash(self, block_hash): diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index e4c0b2c..dc2637f 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -101,18 +101,26 @@ def retrieve( async def load_from_disk(self, chain_endpoint: str): db = AsyncSqliteDB(chain_endpoint=chain_endpoint) - block_mapping, block_hash_mapping, runtime_version_mapping = await db.load_runtime_cache(chain_endpoint) + ( + block_mapping, + block_hash_mapping, + runtime_version_mapping, + ) = await db.load_runtime_cache(chain_endpoint) if not any([block_mapping, block_hash_mapping, runtime_version_mapping]): logger.debug("No runtime mappings in disk cache") else: logger.debug("Found runtime mappings in disk cache") - self.blocks = {x: Runtime.deserialize(y) for x, y in block_mapping.items()} - self.block_hashes = {x: Runtime.deserialize(y) for x, y in block_hash_mapping.items()} - self.versions = {x: Runtime.deserialize(y) for x, y in runtime_version_mapping.items()} + self.blocks = block_mapping + self.block_hashes = block_hash_mapping + self.versions = { + x: Runtime.deserialize(y) for x, y in runtime_version_mapping.items() + } async def dump_to_disk(self, chain_endpoint: str): db = AsyncSqliteDB(chain_endpoint=chain_endpoint) - await db.dump_runtime_cache(chain_endpoint, self.blocks, self.block_hashes, self.versions) + await db.dump_runtime_cache( + chain_endpoint, self.blocks, self.block_hashes, self.versions + ) class Runtime: @@ -163,30 +171,40 @@ def __init__( self.load_registry_type_map() def serialize(self): + metadata_value = self.metadata.data.data return { "chain": self.chain, "type_registry": self.type_registry, - "metadata": self.metadata, - "metadata_v15": self.metadata_v15.to_json() if self.metadata_v15 is not None else None, - "runtime_info": self.runtime_info, - "registry": None, # gets loaded from metadata_v15 + "metadata_value": metadata_value, + "metadata_v15": None, # TODO new bt-decode + "runtime_info": { + "specVersion": self.runtime_version, + "transactionVersion": self.transaction_version, + }, + "registry": self.registry.registry if self.registry is not None else None, "ss58_format": self.ss58_format, - "runtime_config": self.runtime_config, } @classmethod def deserialize(cls, serialized: dict) -> "Runtime": - mdv15 = MetadataV15 - registry = PortableRegistry.from_metadata_v15(mdv15) if (mdv15 := serialized["metadata_v15"]) else None + ss58_format = serialized["ss58_format"] + runtime_config = RuntimeConfigurationObject(ss58_format=ss58_format) + runtime_config.clear_type_registry() + runtime_config.update_type_registry(load_type_registry_preset(name="core")) + metadata = runtime_config.create_scale_object( + "MetadataVersioned", data=ScaleBytes(serialized["metadata_value"]) + ) + metadata.decode() + registry = PortableRegistry.from_json(serialized["registry"]) return cls( chain=serialized["chain"], - metadata=serialized["metadata"], + metadata=metadata, type_registry=serialized["type_registry"], - runtime_config=serialized["runtime_config"], - metadata_v15=mdv15 if mdv15 is not None else None, + runtime_config=runtime_config, + metadata_v15=None, registry=registry, - ss58_format=serialized["ss58_format"], - runtime_info=serialized["runtime_info"] + ss58_format=ss58_format, + runtime_info=serialized["runtime_info"], ) def load_runtime(self): From c75d332861cd1acf92be8fb672f6624d5a9546c8 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 28 Oct 2025 23:36:00 +0200 Subject: [PATCH 05/11] Working! --- async_substrate_interface/types.py | 4 ++-- async_substrate_interface/utils/cache.py | 6 ++++-- pyproject.toml | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index dc2637f..b6883c4 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -176,7 +176,7 @@ def serialize(self): "chain": self.chain, "type_registry": self.type_registry, "metadata_value": metadata_value, - "metadata_v15": None, # TODO new bt-decode + "metadata_v15": self.metadata_v15.encode_to_metadata_option(), "runtime_info": { "specVersion": self.runtime_version, "transactionVersion": self.transaction_version, @@ -201,7 +201,7 @@ def deserialize(cls, serialized: dict) -> "Runtime": metadata=metadata, type_registry=serialized["type_registry"], runtime_config=runtime_config, - metadata_v15=None, + metadata_v15=MetadataV15.decode_from_metadata_option(serialized["metadata_v15"]), registry=registry, ss58_format=ss58_format, runtime_info=serialized["runtime_info"], diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index e55a9d6..0dcc56c 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -54,7 +54,8 @@ async def _create_if_not_exists(self, chain: str, table_name: str): key BLOB, value BLOB, chain TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(key, chain) ); """ ) @@ -208,7 +209,8 @@ def _create_table(c, conn, table_name): key BLOB, value BLOB, chain TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(key, chain) ); """ ) diff --git a/pyproject.toml b/pyproject.toml index ddb2b89..2841365 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ keywords = ["substrate", "development", "bittensor"] dependencies = [ "wheel", - "bt-decode==v0.6.0", + "bt-decode==v0.8.0", "scalecodec~=1.2.11", "websockets>=14.1", "xxhash", From 68a8e164a5741c89cf9e777925e4e16b479d53fe Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 28 Oct 2025 23:36:18 +0200 Subject: [PATCH 06/11] Ruff --- async_substrate_interface/types.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index b6883c4..54611c5 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -201,7 +201,9 @@ def deserialize(cls, serialized: dict) -> "Runtime": metadata=metadata, type_registry=serialized["type_registry"], runtime_config=runtime_config, - metadata_v15=MetadataV15.decode_from_metadata_option(serialized["metadata_v15"]), + metadata_v15=MetadataV15.decode_from_metadata_option( + serialized["metadata_v15"] + ), registry=registry, ss58_format=ss58_format, runtime_info=serialized["runtime_info"], From 66fe45df235af77436ba8bd17fbf2569421e6913 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 28 Oct 2025 23:39:58 +0200 Subject: [PATCH 07/11] Dependencies --- async_substrate_interface/utils/cache.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 0dcc56c..f7561cd 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -4,12 +4,12 @@ import functools import logging import os +import pickle import sqlite3 from pathlib import Path from typing import Callable, Any, Awaitable, Hashable, Optional import aiosqlite -import dill as pickle USE_CACHE = True if os.getenv("NO_CACHE") != "1" else False diff --git a/pyproject.toml b/pyproject.toml index 2841365..e57910c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,11 +8,11 @@ keywords = ["substrate", "development", "bittensor"] dependencies = [ "wheel", + "aiosqlite>=0.21.0,<1.0.0" "bt-decode==v0.8.0", "scalecodec~=1.2.11", "websockets>=14.1", "xxhash", - "aiosqlite>=0.21.0,<1.0.0" ] requires-python = ">=3.9,<3.14" From ca85ff9bbe105f4fd1c2d65fdb5c820e5e45cf9b Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 28 Oct 2025 23:45:00 +0200 Subject: [PATCH 08/11] Clean up debug --- async_substrate_interface/async_substrate.py | 5 ++-- async_substrate_interface/utils/cache.py | 26 +++++++++----------- pyproject.toml | 2 +- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 3d190af..f584e8a 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -4183,12 +4183,11 @@ async def close(self): """ try: await self.runtime_cache.dump_to_disk(self.url) - print(4276) await self.ws.shutdown() except AttributeError: pass - for db in AsyncSqliteDB._instances.values(): - await db.close() + db = AsyncSqliteDB(self.url) + await db.close() @async_sql_lru_cache(maxsize=SUBSTRATE_CACHE_METHOD_SIZE) async def get_parent_block_hash(self, block_hash): diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index f7561cd..ddf7797 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -41,7 +41,6 @@ def __new__(cls, chain_endpoint: str): async def close(self): async with self._lock: if self._db: - print(44) await self._db.close() async def _create_if_not_exists(self, chain: str, table_name: str): @@ -164,22 +163,21 @@ async def dump_runtime_cache( local_chain = await self._create_if_not_exists(chain, table) if local_chain: return None - + serialized_mapping = {} for key, value in mapping.items(): if not isinstance(value, (str, int)): - serialized_runtime = pickle.dumps(value.serialize()) + serialized_value = pickle.dumps(value.serialize()) else: - serialized_runtime = pickle.dumps(value) - - await self._db.execute( - f"INSERT OR REPLACE INTO {table} (key, value, chain) VALUES (?,?,?)", - (key, serialized_runtime, chain), - ) - - # await self._db.executemany( - # f"INSERT OR REPLACE INTO {table} (key, value, chain) VALUES (?,?,?)", - # [(key, pickle.dumps(runtime.serialize()), chain) for key, runtime in mapping.items()], - # ) + serialized_value = pickle.dumps(value) + serialized_mapping[key] = serialized_value + + await self._db.executemany( + f"INSERT OR REPLACE INTO {table} (key, value, chain) VALUES (?,?,?)", + [ + (key, serialized_value_, chain) + for key, serialized_value_ in serialized_mapping.items() + ], + ) await self._db.commit() diff --git a/pyproject.toml b/pyproject.toml index e95caf3..14a1b31 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ keywords = ["substrate", "development", "bittensor"] dependencies = [ "wheel", - "aiosqlite>=0.21.0,<1.0.0" + "aiosqlite>=0.21.0,<1.0.0", "bt-decode==v0.8.0", "scalecodec~=1.2.11", "websockets>=14.1", From bd6ef67c553618c99e37237c1778cd6dc2da23f6 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 28 Oct 2025 23:59:12 +0200 Subject: [PATCH 09/11] Fix test --- tests/unit_tests/test_types.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/test_types.py b/tests/unit_tests/test_types.py index 7292177..14ef45a 100644 --- a/tests/unit_tests/test_types.py +++ b/tests/unit_tests/test_types.py @@ -72,13 +72,15 @@ def test_runtime_cache(): # cache does not yet know that new_fake_block has the same runtime assert runtime_cache.retrieve(new_fake_block) is None assert ( - runtime_cache.retrieve(new_fake_block, runtime_version=fake_version) is not None + runtime_cache.retrieve( + new_fake_block, new_fake_hash, runtime_version=fake_version + ) + is not None ) # after checking the runtime with the new block, it now knows this runtime should also map to this block assert runtime_cache.retrieve(new_fake_block) is not None assert runtime_cache.retrieve(newer_fake_block) is None assert runtime_cache.retrieve(newer_fake_block, fake_hash) is not None assert runtime_cache.retrieve(newer_fake_block) is not None - assert runtime_cache.retrieve(block_hash=new_fake_hash) is None assert runtime_cache.retrieve(fake_block, block_hash=new_fake_hash) is not None assert runtime_cache.retrieve(block_hash=new_fake_hash) is not None From 16441c78092d2283dcfa109c93bf007098320a7d Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 29 Oct 2025 16:19:45 +0200 Subject: [PATCH 10/11] Implemetn improvements from #205 --- async_substrate_interface/utils/cache.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index ddf7797..9858cd8 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -20,6 +20,7 @@ if USE_CACHE else ":memory:" ) +SUBSTRATE_CACHE_METHOD_SIZE = int(os.getenv("SUBSTRATE_CACHE_METHOD_SIZE", "512")) logger = logging.getLogger("async_substrate_interface") @@ -66,7 +67,7 @@ async def _create_if_not_exists(self, chain: str, table_name: str): WHERE rowid IN ( SELECT rowid FROM {table_name} ORDER BY created_at DESC - LIMIT -1 OFFSET 500 + LIMIT -1 OFFSET {SUBSTRATE_CACHE_METHOD_SIZE} ); END; """ @@ -219,7 +220,7 @@ def _create_table(c, conn, table_name): WHERE rowid IN ( SELECT rowid FROM {table_name} ORDER BY created_at DESC - LIMIT -1 OFFSET 500 + LIMIT -1 OFFSET {SUBSTRATE_CACHE_METHOD_SIZE} ); END;""" ) @@ -294,7 +295,7 @@ def inner(self, *args, **kwargs): def async_sql_lru_cache(maxsize: Optional[int] = None): def decorator(func): - @cached_fetcher(max_size=maxsize) + @cached_fetcher(max_size=maxsize, cache_key_index=None) async def inner(self, *args, **kwargs): async_sql_db = AsyncSqliteDB(self.url) result = await async_sql_db(self.url, self, func, args, kwargs) @@ -442,7 +443,7 @@ def __get__(self, instance, owner): return self._instances[instance] -def cached_fetcher(max_size: Optional[int] = None, cache_key_index: int = 0): +def cached_fetcher(max_size: Optional[int] = None, cache_key_index: Optional[int] = 0): """Wrapper for CachedFetcher. See example in CachedFetcher docstring.""" def wrapper(method): From d7b5060791d093658cc5bcdc2d838f1255c8286b Mon Sep 17 00:00:00 2001 From: bdhimes Date: Wed, 29 Oct 2025 16:59:12 +0200 Subject: [PATCH 11/11] Adds test --- async_substrate_interface/utils/cache.py | 1 + tests/unit_tests/test_types.py | 76 ++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 9858cd8..44521a5 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -43,6 +43,7 @@ async def close(self): async with self._lock: if self._db: await self._db.close() + self._db = None async def _create_if_not_exists(self, chain: str, table_name: str): if not (local_chain := _check_if_local(chain)) or not USE_CACHE: diff --git a/tests/unit_tests/test_types.py b/tests/unit_tests/test_types.py index 14ef45a..f2e13b4 100644 --- a/tests/unit_tests/test_types.py +++ b/tests/unit_tests/test_types.py @@ -1,4 +1,12 @@ from async_substrate_interface.types import ScaleObj, Runtime, RuntimeCache +from async_substrate_interface.async_substrate import DiskCachedAsyncSubstrateInterface +from async_substrate_interface.utils import cache + +import sqlite3 +import os +import pickle +import pytest +from unittest.mock import patch def test_scale_object(): @@ -84,3 +92,71 @@ def test_runtime_cache(): assert runtime_cache.retrieve(newer_fake_block) is not None assert runtime_cache.retrieve(fake_block, block_hash=new_fake_hash) is not None assert runtime_cache.retrieve(block_hash=new_fake_hash) is not None + + +@pytest.mark.asyncio +async def test_runtime_cache_from_disk(): + test_db_location = "/tmp/async-substrate-interface-test-cache" + fake_chain = "ws://fake.com" + fake_block = 1 + fake_hash = "0xignore" + new_fake_block = 2 + new_fake_hash = "0xnewfakehash" + + if os.path.exists(test_db_location): + os.remove(test_db_location) + with patch.object(cache, "CACHE_LOCATION", test_db_location): + substrate = DiskCachedAsyncSubstrateInterface(fake_chain, _mock=True) + # Needed to avoid trying to initialize on the network during `substrate.initialize()` + substrate.initialized = True + + # runtime cache should be completely empty + assert substrate.runtime_cache.block_hashes == {} + assert substrate.runtime_cache.blocks == {} + assert substrate.runtime_cache.versions == {} + await substrate.initialize() + + # after initialization, runtime cache should still be completely empty + assert substrate.runtime_cache.block_hashes == {} + assert substrate.runtime_cache.blocks == {} + assert substrate.runtime_cache.versions == {} + await substrate.close() + + # ensure we have created the SQLite DB during initialize() + assert os.path.exists(test_db_location) + + # insert some fake data into our DB + conn = sqlite3.connect(test_db_location) + conn.execute( + "INSERT INTO RuntimeCache_blocks (key, value, chain) VALUES (?, ?, ?)", + (fake_block, pickle.dumps(fake_hash), fake_chain), + ) + conn.commit() + conn.close() + + substrate.initialized = True + await substrate.initialize() + assert substrate.runtime_cache.blocks == {fake_block: fake_hash} + # add an item to the cache + substrate.runtime_cache.add_item( + runtime=None, block_hash=new_fake_hash, block=new_fake_block + ) + await substrate.close() + + # verify that our added item is now in the DB + conn = sqlite3.connect(test_db_location) + cursor = conn.cursor() + cursor.execute("SELECT key, value, chain FROM RuntimeCache_blocks") + query = cursor.fetchall() + cursor.close() + conn.close() + + first_row = query[0] + assert first_row[0] == fake_block + assert pickle.loads(first_row[1]) == fake_hash + assert first_row[2] == fake_chain + + second_row = query[1] + assert second_row[0] == new_fake_block + assert pickle.loads(second_row[1]) == new_fake_hash + assert second_row[2] == fake_chain