1+ """
2+ A number of "plugins" for SubstrateInterface (and AsyncSubstrateInterface). At initial creation, it contains only
3+ Retry (sync and async versions).
4+ """
5+
16import asyncio
27import logging
8+ import socket
39from functools import partial
410from itertools import cycle
511from typing import Optional
12+
613from websockets .exceptions import ConnectionClosed
714
8- from async_substrate_interface .async_substrate import AsyncSubstrateInterface
15+ from async_substrate_interface .async_substrate import AsyncSubstrateInterface , Websocket
916from async_substrate_interface .errors import MaxRetriesExceeded
1017from async_substrate_interface .sync_substrate import SubstrateInterface
1118
6976
7077
7178class RetrySyncSubstrate (SubstrateInterface ):
79+ """
80+ A subclass of SubstrateInterface that allows for handling chain failures by using backup chains. If a sustained
81+ network failure is encountered on a chain endpoint, the object will initialize a new connection on the next chain in
82+ the `fallback_chains` list. If the `retry_forever` flag is set, upon reaching the last chain in `fallback_chains`,
83+ the connection will attempt to iterate over the list (starting with `url`) again.
84+
85+ E.g.
86+ ```
87+ substrate = RetrySyncSubstrate(
88+ "wss://entrypoint-finney.opentensor.ai:443",
89+ fallback_chains=["ws://127.0.0.1:9946"]
90+ )
91+ ```
92+ In this case, if there is a failure on entrypoint-finney, the connection will next attempt to hit localhost. If this
93+ also fails, a `MaxRetriesExceeded` exception will be raised.
94+
95+ ```
96+ substrate = RetrySyncSubstrate(
97+ "wss://entrypoint-finney.opentensor.ai:443",
98+ fallback_chains=["ws://127.0.0.1:9946"],
99+ retry_forever=True
100+ )
101+ ```
102+ In this case, rather than a MaxRetriesExceeded exception being raised upon failure of the second chain (localhost),
103+ the object will again being to initialize a new connection on entrypoint-finney, and then localhost, and so on and
104+ so forth.
105+ """
106+
72107 def __init__ (
73108 self ,
74109 url : str ,
@@ -117,6 +152,7 @@ def __init__(
117152 raise ConnectionError (
118153 f"Unable to connect at any chains specified: { [url ] + fallback_chains } "
119154 )
155+ # "connect" is only used by SubstrateInterface, not AsyncSubstrateInterface
120156 retry_methods = ["connect" ] + RETRY_METHODS
121157 self ._original_methods = {
122158 method : getattr (self , method ) for method in retry_methods
@@ -125,33 +161,19 @@ def __init__(
125161 setattr (self , method , partial (self ._retry , method ))
126162
127163 def _retry (self , method , * args , ** kwargs ):
164+ method_ = self ._original_methods [method ]
128165 try :
129- method_ = self ._original_methods [method ]
130166 return method_ (* args , ** kwargs )
131167 except (MaxRetriesExceeded , ConnectionError , EOFError , ConnectionClosed ) as e :
132168 try :
133169 self ._reinstantiate_substrate (e )
134- method_ = self ._original_methods [method ]
135170 return method_ (* args , ** kwargs )
136171 except StopIteration :
137172 logger .error (
138173 f"Max retries exceeded with { self .url } . No more fallback chains."
139174 )
140175 raise MaxRetriesExceeded
141176
142- def _retry_property (self , property_ ):
143- try :
144- return getattr (self , property_ )
145- except (MaxRetriesExceeded , ConnectionError , EOFError , ConnectionClosed ) as e :
146- try :
147- self ._reinstantiate_substrate (e )
148- return self ._retry_property (property_ )
149- except StopIteration :
150- logger .error (
151- f"Max retries exceeded with { self .url } . No more fallback chains."
152- )
153- raise MaxRetriesExceeded
154-
155177 def _reinstantiate_substrate (self , e : Optional [Exception ] = None ) -> None :
156178 next_network = next (self .fallback_chains )
157179 self .ws .close ()
@@ -170,6 +192,34 @@ def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
170192
171193
172194class RetryAsyncSubstrate (AsyncSubstrateInterface ):
195+ """
196+ A subclass of AsyncSubstrateInterface that allows for handling chain failures by using backup chains. If a
197+ sustained network failure is encountered on a chain endpoint, the object will initialize a new connection on
198+ the next chain in the `fallback_chains` list. If the `retry_forever` flag is set, upon reaching the last chain
199+ in `fallback_chains`, the connection will attempt to iterate over the list (starting with `url`) again.
200+
201+ E.g.
202+ ```
203+ substrate = RetryAsyncSubstrate(
204+ "wss://entrypoint-finney.opentensor.ai:443",
205+ fallback_chains=["ws://127.0.0.1:9946"]
206+ )
207+ ```
208+ In this case, if there is a failure on entrypoint-finney, the connection will next attempt to hit localhost. If this
209+ also fails, a `MaxRetriesExceeded` exception will be raised.
210+
211+ ```
212+ substrate = RetryAsyncSubstrate(
213+ "wss://entrypoint-finney.opentensor.ai:443",
214+ fallback_chains=["ws://127.0.0.1:9946"],
215+ retry_forever=True
216+ )
217+ ```
218+ In this case, rather than a MaxRetriesExceeded exception being raised upon failure of the second chain (localhost),
219+ the object will again being to initialize a new connection on entrypoint-finney, and then localhost, and so on and
220+ so forth.
221+ """
222+
173223 def __init__ (
174224 self ,
175225 url : str ,
@@ -212,62 +262,53 @@ def __init__(
212262 for method in RETRY_METHODS :
213263 setattr (self , method , partial (self ._retry , method ))
214264
215- def _reinstantiate_substrate (self , e : Optional [Exception ] = None ) -> None :
265+ async def _reinstantiate_substrate (self , e : Optional [Exception ] = None ) -> None :
216266 next_network = next (self .fallback_chains )
217267 if e .__class__ == MaxRetriesExceeded :
218268 logger .error (
219269 f"Max retries exceeded with { self .url } . Retrying with { next_network } ."
220270 )
221271 else :
222- print (f"Connection error. Trying again with { next_network } " )
223- super ().__init__ (
224- url = next_network ,
225- ss58_format = self .ss58_format ,
226- type_registry = self .type_registry ,
227- use_remote_preset = self .use_remote_preset ,
228- chain_name = self .chain_name ,
229- _mock = self ._mock ,
230- retry_timeout = self .retry_timeout ,
231- max_retries = self .max_retries ,
272+ logger .error (f"Connection error. Trying again with { next_network } " )
273+ try :
274+ await self .ws .shutdown ()
275+ except AttributeError :
276+ pass
277+ if self ._forgettable_task is not None :
278+ self ._forgettable_task : asyncio .Task
279+ self ._forgettable_task .cancel ()
280+ try :
281+ await self ._forgettable_task
282+ except asyncio .CancelledError :
283+ pass
284+ self .chain_endpoint = next_network
285+ self .url = next_network
286+ self .ws = Websocket (
287+ next_network ,
288+ options = {
289+ "max_size" : self .ws_max_size ,
290+ "write_limit" : 2 ** 16 ,
291+ },
232292 )
233- self ._original_methods = {
234- method : getattr (self , method ) for method in RETRY_METHODS
235- }
236- for method in RETRY_METHODS :
237- setattr (self , method , partial (self ._retry , method ))
293+ self ._initialized = False
294+ self ._initializing = False
295+ await self .initialize ()
238296
239297 async def _retry (self , method , * args , ** kwargs ):
298+ method_ = self ._original_methods [method ]
240299 try :
241- method_ = self ._original_methods [method ]
242300 return await method_ (* args , ** kwargs )
243301 except (
244302 MaxRetriesExceeded ,
245303 ConnectionError ,
246- ConnectionRefusedError ,
304+ ConnectionClosed ,
247305 EOFError ,
306+ socket .gaierror ,
248307 ) as e :
249308 try :
250- self ._reinstantiate_substrate (e )
251- await self .initialize ()
252- method_ = getattr (self , method )
253- if asyncio .iscoroutinefunction (method_ ):
254- return await method_ (* args , ** kwargs )
255- else :
256- return method_ (* args , ** kwargs )
257- except StopIteration :
258- logger .error (
259- f"Max retries exceeded with { self .url } . No more fallback chains."
260- )
261- raise MaxRetriesExceeded
262-
263- async def _retry_property (self , property_ ):
264- try :
265- return await getattr (self , property_ )
266- except (MaxRetriesExceeded , ConnectionError , ConnectionRefusedError ) as e :
267- try :
268- self ._reinstantiate_substrate (e )
269- return await self ._retry_property (property_ )
270- except StopIteration :
309+ await self ._reinstantiate_substrate (e )
310+ return await method_ (* args , ** kwargs )
311+ except StopAsyncIteration :
271312 logger .error (
272313 f"Max retries exceeded with { self .url } . No more fallback chains."
273314 )
0 commit comments