1515)
1616from scalecodec .base import RuntimeConfigurationObject , ScaleBytes , ScaleType
1717from websockets .sync .client import connect
18+ from websockets .exceptions import ConnectionClosed
1819
1920from async_substrate_interface .errors import (
2021 ExtrinsicNotFound ,
@@ -507,6 +508,7 @@ def __init__(
507508 self ._metadata_cache = {}
508509 self .metadata_version_hex = "0x0f000000" # v15
509510 self .reload_type_registry ()
511+ self .ws = self .connect (init = True )
510512 if not _mock :
511513 self .initialize ()
512514
@@ -527,7 +529,7 @@ def initialize(self):
527529 self .initialized = True
528530
529531 def __exit__ (self , exc_type , exc_val , exc_tb ):
530- pass
532+ self . ws . close ()
531533
532534 @property
533535 def properties (self ):
@@ -562,6 +564,15 @@ def name(self):
562564 self ._name = self .rpc_request ("system_name" , []).get ("result" )
563565 return self ._name
564566
567+ def connect (self , init = False ):
568+ if init is True :
569+ return connect (self .chain_endpoint , max_size = self .ws_max_size )
570+ else :
571+ if not self .ws .close_code :
572+ return self .ws
573+ else :
574+ return connect (self .chain_endpoint , max_size = self .ws_max_size )
575+
565576 def get_storage_item (self , module : str , storage_function : str ):
566577 if not self ._metadata :
567578 self .init_runtime ()
@@ -1620,69 +1631,67 @@ def _make_rpc_request(
16201631 _received = {}
16211632 subscription_added = False
16221633
1623- with connect (self .chain_endpoint , max_size = 2 ** 32 ) as ws :
1624- item_id = 0
1625- for payload in payloads :
1626- item_id += 1
1627- ws .send (json .dumps ({** payload ["payload" ], ** {"id" : item_id }}))
1628- request_manager .add_request (item_id , payload ["id" ])
1629-
1630- while True :
1631- try :
1632- response = json .loads (
1633- ws .recv (timeout = self .retry_timeout , decode = False )
1634+ ws = self .connect (init = False if attempt == 1 else True )
1635+ item_id = 0
1636+ for payload in payloads :
1637+ item_id += 1
1638+ ws .send (json .dumps ({** payload ["payload" ], ** {"id" : item_id }}))
1639+ request_manager .add_request (item_id , payload ["id" ])
1640+
1641+ while True :
1642+ try :
1643+ response = json .loads (ws .recv (timeout = self .retry_timeout , decode = False ))
1644+ except (TimeoutError , ConnectionClosed ):
1645+ if attempt >= self .max_retries :
1646+ logging .warning (
1647+ f"Timed out waiting for RPC requests { attempt } times. Exiting."
16341648 )
1635- except TimeoutError :
1636- if attempt >= self .max_retries :
1637- logging .warning (
1638- f"Timed out waiting for RPC requests { attempt } times. Exiting."
1639- )
1640- raise SubstrateRequestException ("Max retries reached." )
1641- else :
1642- return self ._make_rpc_request (
1643- payloads ,
1649+ raise SubstrateRequestException ("Max retries reached." )
1650+ else :
1651+ return self ._make_rpc_request (
1652+ payloads ,
1653+ value_scale_type ,
1654+ storage_item ,
1655+ result_handler ,
1656+ attempt + 1 ,
1657+ )
1658+ if "id" in response :
1659+ _received [response ["id" ]] = response
1660+ elif "params" in response :
1661+ _received [response ["params" ]["subscription" ]] = response
1662+ else :
1663+ raise SubstrateRequestException (response )
1664+ for item_id in list (request_manager .response_map .keys ()):
1665+ if item_id not in request_manager .responses or isinstance (
1666+ result_handler , Callable
1667+ ):
1668+ if response := _received .pop (item_id ):
1669+ if (
1670+ isinstance (result_handler , Callable )
1671+ and not subscription_added
1672+ ):
1673+ # handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
1674+ # with {subscription_id : payload_id}
1675+ try :
1676+ item_id = request_manager .overwrite_request (
1677+ item_id , response ["result" ]
1678+ )
1679+ subscription_added = True
1680+ except KeyError :
1681+ raise SubstrateRequestException (str (response ))
1682+ decoded_response , complete = self ._process_response (
1683+ response ,
1684+ item_id ,
16441685 value_scale_type ,
16451686 storage_item ,
16461687 result_handler ,
1647- attempt + 1 ,
16481688 )
1649- if "id" in response :
1650- _received [response ["id" ]] = response
1651- elif "params" in response :
1652- _received [response ["params" ]["subscription" ]] = response
1653- else :
1654- raise SubstrateRequestException (response )
1655- for item_id in list (request_manager .response_map .keys ()):
1656- if item_id not in request_manager .responses or isinstance (
1657- result_handler , Callable
1658- ):
1659- if response := _received .pop (item_id ):
1660- if (
1661- isinstance (result_handler , Callable )
1662- and not subscription_added
1663- ):
1664- # handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
1665- # with {subscription_id : payload_id}
1666- try :
1667- item_id = request_manager .overwrite_request (
1668- item_id , response ["result" ]
1669- )
1670- subscription_added = True
1671- except KeyError :
1672- raise SubstrateRequestException (str (response ))
1673- decoded_response , complete = self ._process_response (
1674- response ,
1675- item_id ,
1676- value_scale_type ,
1677- storage_item ,
1678- result_handler ,
1679- )
1680- request_manager .add_response (
1681- item_id , decoded_response , complete
1682- )
1689+ request_manager .add_response (
1690+ item_id , decoded_response , complete
1691+ )
16831692
1684- if request_manager .is_complete :
1685- break
1693+ if request_manager .is_complete :
1694+ break
16861695
16871696 return request_manager .get_results ()
16881697
@@ -2874,9 +2883,8 @@ def close(self):
28742883 """
28752884 Closes the substrate connection, and the websocket connection.
28762885 """
2877- # TODO change this logic
28782886 try :
2879- self .ws .shutdown ()
2887+ self .ws .close ()
28802888 except AttributeError :
28812889 pass
28822890
0 commit comments