99from pathlib import Path
1010from typing import Callable , Any , Awaitable , Hashable , Optional
1111
12+ import aiosqlite
13+
14+
1215USE_CACHE = True if os .getenv ("NO_CACHE" ) != "1" else False
1316CACHE_LOCATION = (
1417 os .path .expanduser (
2124logger = logging .getLogger ("async_substrate_interface" )
2225
2326
27+ class AsyncSqliteDB :
28+ _instances : dict [str , "AsyncSqliteDB" ] = {}
29+ _db : Optional [aiosqlite .Connection ] = None
30+
31+ def __new__ (cls , chain_endpoint : str ):
32+ try :
33+ return cls ._instances [chain_endpoint ]
34+ except KeyError :
35+ instance = super ().__new__ (cls )
36+ cls ._instances [chain_endpoint ] = instance
37+ return instance
38+
39+ async def __call__ (self , chain , func , args , kwargs ) -> Optional [Any ]:
40+ if not self ._db :
41+ _ensure_dir ()
42+ self ._db = await aiosqlite .connect (CACHE_LOCATION )
43+ table_name = _get_table_name (func )
44+ key = None
45+ if not (local_chain := _check_if_local (chain )) or not USE_CACHE :
46+ await self ._db .execute (
47+ f"""CREATE TABLE IF NOT EXISTS { table_name }
48+ (
49+ rowid INTEGER PRIMARY KEY AUTOINCREMENT,
50+ key BLOB,
51+ value BLOB,
52+ chain TEXT,
53+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
54+ );
55+ """
56+ )
57+ await self ._db .execute (
58+ f"""CREATE TRIGGER IF NOT EXISTS prune_rows_trigger AFTER INSERT ON { table_name }
59+ BEGIN
60+ DELETE FROM { table_name }
61+ WHERE rowid IN (
62+ SELECT rowid FROM { table_name }
63+ ORDER BY created_at DESC
64+ LIMIT -1 OFFSET 500
65+ );
66+ END;"""
67+ )
68+ key = pickle .dumps ((args , kwargs ))
69+ try :
70+ cursor : aiosqlite .Cursor = await self ._db .execute (
71+ f"SELECT value FROM { table_name } WHERE key=? AND chain=?" ,
72+ (key , chain ),
73+ )
74+ result = await cursor .fetchone ()
75+ if result is not None :
76+ return pickle .loads (result [0 ])
77+ except (pickle .PickleError , sqlite3 .Error ) as e :
78+ logger .exception ("Cache error" , exc_info = e )
79+ pass
80+
81+ result = await func (* args , ** kwargs )
82+ if not local_chain or not USE_CACHE :
83+ # TODO use a task here
84+ await self ._db .execute (
85+ f"INSERT OR REPLACE INTO { table_name } (key, value, chain) VALUES (?,?,?)" ,
86+ (key , pickle .dumps (result ), chain ),
87+ )
88+ return result
89+
90+
2491def _ensure_dir ():
2592 path = Path (CACHE_LOCATION ).parent
2693 if not path .exists ():
@@ -115,7 +182,8 @@ def inner(self, *args, **kwargs):
115182 )
116183
117184 # If not in DB, call func and store in DB
118- result = func (self , * args , ** kwargs )
185+ if result is None :
186+ result = func (self , * args , ** kwargs )
119187
120188 if not local_chain or not USE_CACHE :
121189 _insert_into_cache (c , conn , table_name , key , result , chain )
@@ -131,15 +199,8 @@ def async_sql_lru_cache(maxsize: Optional[int] = None):
131199 def decorator (func ):
132200 @cached_fetcher (max_size = maxsize )
133201 async def inner (self , * args , ** kwargs ):
134- c , conn , table_name , key , result , chain , local_chain = (
135- _shared_inner_fn_logic (func , self , args , kwargs )
136- )
137-
138- # If not in DB, call func and store in DB
139- result = await func (self , * args , ** kwargs )
140- if not local_chain or not USE_CACHE :
141- _insert_into_cache (c , conn , table_name , key , result , chain )
142-
202+ async_sql_db = AsyncSqliteDB (self .url )
203+ result = await async_sql_db (self .url , func , args , kwargs )
143204 return result
144205
145206 return inner
0 commit comments