11import asyncio
2+ import inspect
23from collections import OrderedDict
34import functools
5+ import logging
46import os
57import pickle
68import sqlite3
79from pathlib import Path
8- from typing import Callable , Any
9-
10- import asyncstdlib as a
11-
10+ from typing import Callable , Any , Awaitable , Hashable , Optional
1211
1312USE_CACHE = True if os .getenv ("NO_CACHE" ) != "1" else False
1413CACHE_LOCATION = (
1918 else ":memory:"
2019)
2120
21+ logger = logging .getLogger ("async_substrate_interface" )
22+
2223
2324def _ensure_dir ():
2425 path = Path (CACHE_LOCATION ).parent
@@ -70,7 +71,7 @@ def _retrieve_from_cache(c, table_name, key, chain):
7071 if result is not None :
7172 return pickle .loads (result [0 ])
7273 except (pickle .PickleError , sqlite3 .Error ) as e :
73- print ( f "Cache error: { str ( e ) } " )
74+ logger . exception ( "Cache error" , exc_info = e )
7475 pass
7576
7677
@@ -82,7 +83,7 @@ def _insert_into_cache(c, conn, table_name, key, result, chain):
8283 )
8384 conn .commit ()
8485 except (pickle .PickleError , sqlite3 .Error ) as e :
85- print ( f "Cache error: { str ( e ) } " )
86+ logger . exception ( "Cache error" , exc_info = e )
8687 pass
8788
8889
@@ -128,7 +129,7 @@ def inner(self, *args, **kwargs):
128129
129130def async_sql_lru_cache (maxsize = None ):
130131 def decorator (func ):
131- @a . lru_cache ( maxsize = maxsize )
132+ @cached_fetcher ( max_size = maxsize )
132133 async def inner (self , * args , ** kwargs ):
133134 c , conn , table_name , key , result , chain , local_chain = (
134135 _shared_inner_fn_logic (func , self , args , kwargs )
@@ -147,6 +148,10 @@ async def inner(self, *args, **kwargs):
147148
148149
149150class LRUCache :
151+ """
152+ Basic Least-Recently-Used Cache, with simple methods `set` and `get`
153+ """
154+
150155 def __init__ (self , max_size : int ):
151156 self .max_size = max_size
152157 self .cache = OrderedDict ()
@@ -167,31 +172,121 @@ def get(self, key):
167172
168173
169174class CachedFetcher :
170- def __init__ (self , max_size : int , method : Callable ):
171- self ._inflight : dict [int , asyncio .Future ] = {}
175+ """
176+ Async caching class that allows the standard async LRU cache system, but also allows for concurrent
177+ asyncio calls (with the same args) to use the same result of a single call.
178+
179+ This should only be used for asyncio calls where the result is immutable.
180+
181+ Concept and usage:
182+ ```
183+ async def fetch(self, block_hash: str) -> str:
184+ return await some_resource(block_hash)
185+
186+ a1, a2, b = await asyncio.gather(fetch("a"), fetch("a"), fetch("b"))
187+ ```
188+
189+ Here, you are making three requests, but you really only need to make two I/O requests
190+ (one for "a", one for "b"), and while you wouldn't typically make a request like this directly, it's very
191+ common in using this library to inadvertently make these requests y gathering multiple resources that depend
192+ on the calls like this under the hood.
193+
194+ By using
195+
196+ ```
197+ @cached_fetcher(max_size=512)
198+ async def fetch(self, block_hash: str) -> str:
199+ return await some_resource(block_hash)
200+
201+ a1, a2, b = await asyncio.gather(fetch("a"), fetch("a"), fetch("b"))
202+ ```
203+
204+ You are only making two I/O calls, and a2 will simply use the result of a1 when it lands.
205+ """
206+
207+ def __init__ (
208+ self ,
209+ max_size : int ,
210+ method : Callable [..., Awaitable [Any ]],
211+ cache_key_index : Optional [int ] = 0 ,
212+ ):
213+ """
214+ Args:
215+ max_size: max size of the cache (in items)
216+ method: the function to cache
217+ cache_key_index: if the method takes multiple args, this is the index of that cache key in the args list
218+ (default is the first arg). By setting this to `None`, it will use all args as the cache key.
219+ """
220+ self ._inflight : dict [Hashable , asyncio .Future ] = {}
172221 self ._method = method
173222 self ._cache = LRUCache (max_size = max_size )
223+ self ._cache_key_index = cache_key_index
174224
175- async def execute (self , single_arg : Any ) -> str :
176- if item := self ._cache .get (single_arg ):
225+ def make_cache_key (self , args : tuple , kwargs : dict ) -> Hashable :
226+ bound = inspect .signature (self ._method ).bind (* args , ** kwargs )
227+ bound .apply_defaults ()
228+
229+ if self ._cache_key_index is not None :
230+ key_name = list (bound .arguments )[self ._cache_key_index ]
231+ return bound .arguments [key_name ]
232+
233+ return (tuple (bound .arguments .items ()),)
234+
235+ async def __call__ (self , * args : Any , ** kwargs : Any ) -> Any :
236+ key = self .make_cache_key (args , kwargs )
237+
238+ if item := self ._cache .get (key ):
177239 return item
178240
179- if single_arg in self ._inflight :
180- result = await self ._inflight [single_arg ]
181- return result
241+ if key in self ._inflight :
242+ return await self ._inflight [key ]
182243
183244 loop = asyncio .get_running_loop ()
184245 future = loop .create_future ()
185- self ._inflight [single_arg ] = future
246+ self ._inflight [key ] = future
186247
187248 try :
188- result = await self ._method (single_arg )
189- self ._cache .set (single_arg , result )
249+ result = await self ._method (* args , ** kwargs )
250+ self ._cache .set (key , result )
190251 future .set_result (result )
191252 return result
192253 except Exception as e :
193- # Propagate errors
194254 future .set_exception (e )
195255 raise
196256 finally :
197- self ._inflight .pop (single_arg , None )
257+ self ._inflight .pop (key , None )
258+
259+
260+ class _CachedFetcherMethod :
261+ """
262+ Helper class for using CachedFetcher with method caches (rather than functions)
263+ """
264+
265+ def __init__ (self , method , max_size : int , cache_key_index : int ):
266+ self .method = method
267+ self .max_size = max_size
268+ self .cache_key_index = cache_key_index
269+ self ._instances = {}
270+
271+ def __get__ (self , instance , owner ):
272+ if instance is None :
273+ return self
274+
275+ # Cache per-instance
276+ if instance not in self ._instances :
277+ bound_method = self .method .__get__ (instance , owner )
278+ self ._instances [instance ] = CachedFetcher (
279+ max_size = self .max_size ,
280+ method = bound_method ,
281+ cache_key_index = self .cache_key_index ,
282+ )
283+ return self ._instances [instance ]
284+
285+
286+ def cached_fetcher (max_size : int , cache_key_index : int = 0 ):
287+ """Wrapper for CachedFetcher. See example in CachedFetcher docstring."""
288+
289+ def wrapper (method ):
290+ return _CachedFetcherMethod (method , max_size , cache_key_index )
291+
292+ return wrapper
0 commit comments