Skip to content

Commit 4b0cb39

Browse files
committed
Tests
1 parent b2a1468 commit 4b0cb39

File tree

3 files changed

+157
-37
lines changed

3 files changed

+157
-37
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,6 @@ async def _wait_with_activity_timeout(self, coro, timeout: float):
607607
"""
608608
activity_task = asyncio.create_task(self._last_activity.wait())
609609

610-
# Handle both coroutines and tasks
611610
if isinstance(coro, asyncio.Task):
612611
main_task = coro
613612
else:
@@ -620,25 +619,22 @@ async def _wait_with_activity_timeout(self, coro, timeout: float):
620619
return_when=asyncio.FIRST_COMPLETED,
621620
)
622621

623-
if not done: # Timeout occurred
622+
if not done:
624623
logger.debug(f"Activity timeout after {timeout}s, no activity detected")
625624
for task in pending:
626625
task.cancel()
627626
raise TimeoutError()
628627

629-
# Check which completed
630628
if main_task in done:
631629
activity_task.cancel()
632630

633-
# Check if the task raised an exception
634631
exc = main_task.exception()
635632
if exc is not None:
636633
raise exc
637634
else:
638635
return main_task.result()
639-
else: # activity_task completed (activity occurred elsewhere)
636+
else:
640637
logger.debug("Activity detected, resetting timeout")
641-
# Recursively wait again with fresh timeout
642638
return await self._wait_with_activity_timeout(main_task, timeout)
643639

644640
except asyncio.CancelledError:
@@ -849,14 +845,12 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception:
849845
ws.recv(decode=False), self.retry_timeout
850846
)
851847
await self._reset_activity_timer()
852-
# reset the counter once we successfully receive something back
853848
self._attempts = 0
854849
await self._recv(recd)
855850
except websockets.exceptions.ConnectionClosedOK as e:
856851
logger.debug("ConnectionClosedOK")
857852
return e
858853
except Exception as e:
859-
logger.exception("Receiving exception", exc_info=e)
860854
if isinstance(e, ssl.SSLError):
861855
e = ConnectionClosed
862856
if not isinstance(
@@ -890,7 +884,6 @@ async def _start_sending(self, ws) -> Exception:
890884
logger.debug("Sent to websocket")
891885
await self._reset_activity_timer()
892886
except Exception as e:
893-
logger.exception("Maybe timeout? 769", exc_info=e)
894887
if isinstance(e, ssl.SSLError):
895888
e = ConnectionClosed
896889
if not isinstance(

tests/helpers/proxy_server.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
import asyncio
1+
import logging
2+
import time
23

3-
from websockets.asyncio.server import serve, ServerConnection
4-
from websockets.asyncio.client import connect
4+
from websockets.sync.server import serve, ServerConnection
5+
from websockets.sync.client import connect
6+
7+
logger = logging.getLogger("websockets.proxy")
58

69

710
class ProxyServer:
@@ -14,38 +17,41 @@ def __init__(self, upstream: str, time_til_pause: float, time_til_resume: float)
1417
self.shutdown_time = 0
1518
self.resume_time = 0
1619

17-
async def connect(self):
18-
self.upstream_connection = await connect(self.upstream_server)
19-
self.connection_time = asyncio.get_running_loop().time()
20+
def connect(self):
21+
self.upstream_connection = connect(self.upstream_server)
22+
self.connection_time = time.time()
2023
self.shutdown_time = self.connection_time + self.time_til_pause
2124
self.resume_time = self.shutdown_time + self.time_til_resume
2225

23-
async def close(self):
26+
def close(self):
2427
if self.upstream_connection:
25-
await self.upstream_connection.close()
26-
27-
async def proxy_request(self, websocket: ServerConnection):
28-
async for message in websocket:
29-
print(websocket)
30-
await self.upstream_connection.send(message)
31-
recd = await self.upstream_connection.recv()
32-
current_time = asyncio.get_running_loop().time()
28+
self.upstream_connection.close()
29+
self.server.shutdown()
30+
31+
def proxy_request(self, websocket: ServerConnection):
32+
for message in websocket:
33+
self.upstream_connection.send(message)
34+
recd = self.upstream_connection.recv()
35+
current_time = time.time()
3336
if self.shutdown_time < current_time < self.resume_time:
34-
print("Pausing")
35-
await asyncio.sleep(self.time_til_resume)
36-
await websocket.send(recd)
37-
# await websocket.send(message)
37+
logger.info("Pausing")
38+
time.sleep(self.time_til_resume)
39+
websocket.send(recd)
40+
41+
def serve(self):
42+
with serve(self.proxy_request, "localhost", 8080) as self.server:
43+
self.server.serve_forever()
3844

39-
async def serve(self):
40-
async with serve(self.proxy_request, "localhost", 8080) as server:
41-
await server.serve_forever()
45+
def connect_and_serve(self):
46+
self.connect()
47+
self.serve()
4248

4349

44-
async def main():
45-
proxy = ProxyServer("wss://archive.sub.latent.to", 20, 30)
46-
await proxy.connect()
47-
await proxy.serve()
50+
def run_proxy_server(time_til_pause: float = 20.0, time_til_resume: float = 30.0):
51+
proxy = ProxyServer("wss://archive.sub.latent.to", time_til_pause, time_til_resume)
52+
proxy.connect()
53+
proxy.serve()
4854

4955

5056
if __name__ == "__main__":
51-
asyncio.run(main())
57+
run_proxy_server()

tests/integration_tests/test_async_substrate_interface.py

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import asyncio
2+
import logging
3+
import os.path
24
import time
5+
import threading
36

47
import pytest
58
from scalecodec import ss58_encode
69

7-
from async_substrate_interface.async_substrate import AsyncSubstrateInterface
10+
from async_substrate_interface.async_substrate import AsyncSubstrateInterface, logger
811
from async_substrate_interface.types import ScaleObj
912
from tests.helpers.settings import ARCHIVE_ENTRYPOINT, LATENT_LITE_ENTRYPOINT
13+
from tests.helpers.proxy_server import ProxyServer
1014

1115

1216
@pytest.mark.asyncio
@@ -174,3 +178,120 @@ async def test_query_map_with_odd_number_of_params():
174178
first_record = qm.records[0]
175179
assert len(first_record) == 2
176180
assert len(first_record[0]) == 4
181+
182+
183+
@pytest.mark.asyncio
184+
async def test_improved_reconnection_():
185+
ws_logger_path = "/tmp/websockets-proxy-test"
186+
ws_logger = logging.getLogger("websockets.proxy")
187+
if os.path.exists(ws_logger_path):
188+
os.remove(ws_logger_path)
189+
ws_logger.setLevel(logging.INFO)
190+
ws_logger.addHandler(logging.FileHandler(ws_logger_path))
191+
192+
asi_logger_path = "/tmp/async-substrate-interface-test"
193+
if os.path.exists(asi_logger_path):
194+
os.remove(asi_logger_path)
195+
logger.setLevel(logging.DEBUG)
196+
logger.addHandler(logging.FileHandler(asi_logger_path))
197+
198+
proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20)
199+
proxy.connect()
200+
server_thread = threading.Thread(target=proxy.serve)
201+
server_thread.daemon = True
202+
server_thread.start()
203+
204+
await asyncio.sleep(3) # give the server start up time
205+
206+
try:
207+
async with AsyncSubstrateInterface(
208+
"ws://localhost:8080",
209+
ss58_format=42,
210+
chain_name="Bittensor",
211+
retry_timeout=10.0,
212+
ws_shutdown_timer=None,
213+
) as substrate:
214+
blocks_to_check = [
215+
5215000,
216+
5215001,
217+
5215002,
218+
5215003,
219+
5215004,
220+
5215005,
221+
5215006,
222+
]
223+
tasks = []
224+
for block in blocks_to_check:
225+
block_hash = await substrate.get_block_hash(block_id=block)
226+
tasks.append(
227+
substrate.query_map(
228+
"SubtensorModule", "TotalHotkeyShares", block_hash=block_hash
229+
)
230+
)
231+
records = await asyncio.gather(*tasks)
232+
assert len(records) == len(blocks_to_check)
233+
await substrate.close()
234+
with open(ws_logger_path, "r") as f:
235+
assert "Pausing" in f.read()
236+
with open(asi_logger_path, "r") as f:
237+
assert "Timeout/ConnectionClosed occurred." in f.read()
238+
finally:
239+
proxy.stop()
240+
server_thread.join(timeout=5)
241+
242+
243+
@pytest.mark.asyncio
244+
async def test_improved_reconnection():
245+
ws_logger_path = "/tmp/websockets-proxy-test"
246+
ws_logger = logging.getLogger("websockets.proxy")
247+
if os.path.exists(ws_logger_path):
248+
os.remove(ws_logger_path)
249+
ws_logger.setLevel(logging.INFO)
250+
ws_logger.addHandler(logging.FileHandler(ws_logger_path))
251+
252+
asi_logger_path = "/tmp/async-substrate-interface-test"
253+
if os.path.exists(asi_logger_path):
254+
os.remove(asi_logger_path)
255+
logger.setLevel(logging.DEBUG)
256+
logger.addHandler(logging.FileHandler(asi_logger_path))
257+
258+
proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20)
259+
260+
server_thread = threading.Thread(target=proxy.connect_and_serve)
261+
server_thread.start()
262+
await asyncio.sleep(3) # give the server start up time
263+
async with AsyncSubstrateInterface(
264+
"ws://localhost:8080",
265+
ss58_format=42,
266+
chain_name="Bittensor",
267+
retry_timeout=10.0,
268+
ws_shutdown_timer=None,
269+
) as substrate:
270+
blocks_to_check = [
271+
5215000,
272+
5215001,
273+
5215002,
274+
5215003,
275+
5215004,
276+
5215005,
277+
5215006,
278+
]
279+
tasks = []
280+
for block in blocks_to_check:
281+
block_hash = await substrate.get_block_hash(block_id=block)
282+
tasks.append(
283+
substrate.query_map(
284+
"SubtensorModule", "TotalHotkeyShares", block_hash=block_hash
285+
)
286+
)
287+
records = await asyncio.gather(*tasks)
288+
assert len(records) == len(blocks_to_check)
289+
await substrate.close()
290+
with open(ws_logger_path, "r") as f:
291+
assert "Pausing" in f.read()
292+
with open(asi_logger_path, "r") as f:
293+
assert "Timeout/ConnectionClosed occurred." in f.read()
294+
shutdown_thread = threading.Thread(target=proxy.close)
295+
shutdown_thread.start()
296+
shutdown_thread.join(timeout=5)
297+
server_thread.join(timeout=5)

0 commit comments

Comments
 (0)