From 33ad794f57eea4ceb881c9a44226ccbfd3e04e84 Mon Sep 17 00:00:00 2001 From: Arthurdw Date: Mon, 17 Nov 2025 17:34:11 +0100 Subject: [PATCH 1/2] wip: fix issue with multiple concurrent calls fix: bla --- async_substrate_interface/async_substrate.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 076eb10..6835db9 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -694,9 +694,17 @@ async def _cancel(self): async def connect(self, force=False): if not force: - await self._lock.acquire() + async with self._lock: + return await self._connect_internal(force) else: logger.debug("Proceeding without acquiring lock.") + return await self._connect_internal(force) + + async def _connect_internal(self, force): + # Check state again after acquiring lock to avoid duplicate connections + if not force and self.state in (State.OPEN, State.CONNECTING): + return None + logger.debug(f"Websocket connecting to {self.ws_url}") if self._sending is None or self._sending.empty(): self._sending = asyncio.Queue() @@ -725,8 +733,6 @@ async def connect(self, force=False): except socket.gaierror: logger.debug(f"Hostname not known (this is just for testing") await asyncio.sleep(10) - if self._lock.locked(): - self._lock.release() return await self.connect(force=force) logger.debug("Connection established") self.ws = connection @@ -734,8 +740,6 @@ async def connect(self, force=False): self._send_recv_task = asyncio.get_running_loop().create_task( self._handler(self.ws) ) - if self._lock.locked(): - self._lock.release() return None async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: From ee3cb7037c06aecc3ab262af2c9cbad82ce8ba1a Mon Sep 17 00:00:00 2001 From: Arthurdw Date: Mon, 17 Nov 2025 17:48:40 +0100 Subject: [PATCH 2/2] tests: add concurrency test --- .../test_async_substrate_interface.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/integration_tests/test_async_substrate_interface.py b/tests/integration_tests/test_async_substrate_interface.py index 7fac0a9..5a480d3 100644 --- a/tests/integration_tests/test_async_substrate_interface.py +++ b/tests/integration_tests/test_async_substrate_interface.py @@ -293,3 +293,29 @@ async def test_get_payment_info(): assert partial_fee_all_options > partial_fee_no_era assert partial_fee_all_options > partial_fee_era print("test_get_payment_info succeeded") + + +@pytest.mark.asyncio +async def test_concurrent_rpc_requests(): + """ + Test that multiple concurrent RPC requests on a shared connection work correctly. + + This test verifies the fix for the issue where multiple concurrent tasks + re-initializing the WebSocket connection caused requests to hang. + """ + print("Testing test_concurrent_rpc_requests") + + async def concurrent_task(substrate, task_id): + """Make multiple RPC calls from a single task.""" + for i in range(5): + result = await substrate.get_block_number(None) + assert isinstance(result, int) + assert result > 0 + + async with AsyncSubstrateInterface(LATENT_LITE_ENTRYPOINT) as substrate: + # Run 5 concurrent tasks, each making 5 RPC calls (25 total) + # This tests that the connection is properly shared without re-initialization + tasks = [concurrent_task(substrate, i) for i in range(5)] + await asyncio.gather(*tasks) + + print("test_concurrent_rpc_requests succeeded")