Skip to content

Commit c24c3d0

Browse files
committed
Reuses the websocket for sync Substrate
1 parent 4666bfe commit c24c3d0

File tree

1 file changed

+68
-60
lines changed

1 file changed

+68
-60
lines changed

async_substrate_interface/sync_substrate.py

Lines changed: 68 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from scalecodec.base import RuntimeConfigurationObject, ScaleBytes, ScaleType
1717
from websockets.sync.client import connect
18+
from websockets.exceptions import ConnectionClosed
1819

1920
from async_substrate_interface.errors import (
2021
ExtrinsicNotFound,
@@ -507,6 +508,7 @@ def __init__(
507508
self._metadata_cache = {}
508509
self.metadata_version_hex = "0x0f000000" # v15
509510
self.reload_type_registry()
511+
self.ws = self.connect(init=True)
510512
if not _mock:
511513
self.initialize()
512514

@@ -527,7 +529,7 @@ def initialize(self):
527529
self.initialized = True
528530

529531
def __exit__(self, exc_type, exc_val, exc_tb):
530-
pass
532+
self.ws.close()
531533

532534
@property
533535
def properties(self):
@@ -562,6 +564,15 @@ def name(self):
562564
self._name = self.rpc_request("system_name", []).get("result")
563565
return self._name
564566

567+
def connect(self, init=False):
568+
if init is True:
569+
return connect(self.chain_endpoint, max_size=2**32)
570+
else:
571+
if not self.ws.close_code:
572+
return self.ws
573+
else:
574+
return connect(self.chain_endpoint, max_size=2**32)
575+
565576
def get_storage_item(self, module: str, storage_function: str):
566577
if not self._metadata:
567578
self.init_runtime()
@@ -1620,69 +1631,67 @@ def _make_rpc_request(
16201631
_received = {}
16211632
subscription_added = False
16221633

1623-
with connect(self.chain_endpoint, max_size=2**32) as ws:
1624-
item_id = 0
1625-
for payload in payloads:
1626-
item_id += 1
1627-
ws.send(json.dumps({**payload["payload"], **{"id": item_id}}))
1628-
request_manager.add_request(item_id, payload["id"])
1629-
1630-
while True:
1631-
try:
1632-
response = json.loads(
1633-
ws.recv(timeout=self.retry_timeout, decode=False)
1634+
ws = self.connect(init=False if attempt == 1 else True)
1635+
item_id = 0
1636+
for payload in payloads:
1637+
item_id += 1
1638+
ws.send(json.dumps({**payload["payload"], **{"id": item_id}}))
1639+
request_manager.add_request(item_id, payload["id"])
1640+
1641+
while True:
1642+
try:
1643+
response = json.loads(ws.recv(timeout=self.retry_timeout, decode=False))
1644+
except (TimeoutError, ConnectionClosed):
1645+
if attempt >= self.max_retries:
1646+
logging.warning(
1647+
f"Timed out waiting for RPC requests {attempt} times. Exiting."
16341648
)
1635-
except TimeoutError:
1636-
if attempt >= self.max_retries:
1637-
logging.warning(
1638-
f"Timed out waiting for RPC requests {attempt} times. Exiting."
1639-
)
1640-
raise SubstrateRequestException("Max retries reached.")
1641-
else:
1642-
return self._make_rpc_request(
1643-
payloads,
1649+
raise SubstrateRequestException("Max retries reached.")
1650+
else:
1651+
return self._make_rpc_request(
1652+
payloads,
1653+
value_scale_type,
1654+
storage_item,
1655+
result_handler,
1656+
attempt + 1,
1657+
)
1658+
if "id" in response:
1659+
_received[response["id"]] = response
1660+
elif "params" in response:
1661+
_received[response["params"]["subscription"]] = response
1662+
else:
1663+
raise SubstrateRequestException(response)
1664+
for item_id in list(request_manager.response_map.keys()):
1665+
if item_id not in request_manager.responses or isinstance(
1666+
result_handler, Callable
1667+
):
1668+
if response := _received.pop(item_id):
1669+
if (
1670+
isinstance(result_handler, Callable)
1671+
and not subscription_added
1672+
):
1673+
# handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
1674+
# with {subscription_id : payload_id}
1675+
try:
1676+
item_id = request_manager.overwrite_request(
1677+
item_id, response["result"]
1678+
)
1679+
subscription_added = True
1680+
except KeyError:
1681+
raise SubstrateRequestException(str(response))
1682+
decoded_response, complete = self._process_response(
1683+
response,
1684+
item_id,
16441685
value_scale_type,
16451686
storage_item,
16461687
result_handler,
1647-
attempt + 1,
16481688
)
1649-
if "id" in response:
1650-
_received[response["id"]] = response
1651-
elif "params" in response:
1652-
_received[response["params"]["subscription"]] = response
1653-
else:
1654-
raise SubstrateRequestException(response)
1655-
for item_id in list(request_manager.response_map.keys()):
1656-
if item_id not in request_manager.responses or isinstance(
1657-
result_handler, Callable
1658-
):
1659-
if response := _received.pop(item_id):
1660-
if (
1661-
isinstance(result_handler, Callable)
1662-
and not subscription_added
1663-
):
1664-
# handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
1665-
# with {subscription_id : payload_id}
1666-
try:
1667-
item_id = request_manager.overwrite_request(
1668-
item_id, response["result"]
1669-
)
1670-
subscription_added = True
1671-
except KeyError:
1672-
raise SubstrateRequestException(str(response))
1673-
decoded_response, complete = self._process_response(
1674-
response,
1675-
item_id,
1676-
value_scale_type,
1677-
storage_item,
1678-
result_handler,
1679-
)
1680-
request_manager.add_response(
1681-
item_id, decoded_response, complete
1682-
)
1689+
request_manager.add_response(
1690+
item_id, decoded_response, complete
1691+
)
16831692

1684-
if request_manager.is_complete:
1685-
break
1693+
if request_manager.is_complete:
1694+
break
16861695

16871696
return request_manager.get_results()
16881697

@@ -2874,9 +2883,8 @@ def close(self):
28742883
"""
28752884
Closes the substrate connection, and the websocket connection.
28762885
"""
2877-
# TODO change this logic
28782886
try:
2879-
self.ws.shutdown()
2887+
self.ws.close()
28802888
except AttributeError:
28812889
pass
28822890

0 commit comments

Comments
 (0)