Skip to content

Commit fac7d09

Browse files
committed
Tests: Fix broadcaster disconnection test race condition
1 parent 3d6bc24 commit fac7d09

File tree

1 file changed

+33
-12
lines changed

1 file changed

+33
-12
lines changed

tests/broadcaster_test.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from multiprocessing import Process
1414

1515
from fastapi_websocket_rpc.logger import get_logger, logging_config, LoggingModes
16+
1617
logging_config.set_mode(LoggingModes.LOGURU)
1718

1819
# Add parent path to use local src as package for tests
@@ -24,7 +25,11 @@
2425

2526
logger = get_logger("Test")
2627
logger.remove()
27-
logger.add(sys.stderr, format="<green>{time}</green> | {process} | <blue>{name: <50}</blue>|<level>{level:^6} | {message}</level>", level="INFO")
28+
logger.add(
29+
sys.stderr,
30+
format="<green>{time}</green> | {process} | <blue>{name: <50}</blue>|<level>{level:^6} | {message}</level>",
31+
level="INFO",
32+
)
2833

2934
# Configurable
3035
PORT = int(os.environ.get("PORT") or "7990")
@@ -37,26 +42,28 @@
3742
PG_HOST_PORT = 25432
3843
PG_SLEEP_TIME = 10
3944

45+
4046
@pytest.fixture()
4147
def postgres(request):
4248
CONTAINER_NAME = "broadcastdb" + "".join(
4349
[random.choice(string.ascii_letters) for _ in range(8)]
4450
)
4551

4652
def rm_container():
47-
os.system(f'docker rm -f {CONTAINER_NAME} > /dev/null 2>&1')
53+
os.system(f"docker rm -f {CONTAINER_NAME} > /dev/null 2>&1")
4854

49-
rm_container() # Make sure no previous container exists
55+
rm_container() # Make sure no previous container exists
5056

51-
postgres_args = ''
57+
postgres_args = ""
5258
timeout_marker = request.node.get_closest_marker("postgres_idle_timeout")
5359
if timeout_marker is not None:
5460
timeout = timeout_marker.args[0]
55-
postgres_args = f'-c idle_session_timeout={timeout} -c idle_in_transaction_session_timeout={timeout}'
56-
61+
postgres_args = f"-c idle_session_timeout={timeout} -c idle_in_transaction_session_timeout={timeout}"
5762

5863
logger.info(f"running postgres on host port {PG_HOST_PORT}...")
59-
os.system(f'docker run -d -p {PG_HOST_PORT}:5432 --name {CONTAINER_NAME} -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres:alpine {postgres_args} > /dev/null 2>&1')
64+
os.system(
65+
f"docker run -d -p {PG_HOST_PORT}:5432 --name {CONTAINER_NAME} -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres:alpine {postgres_args} > /dev/null 2>&1"
66+
)
6067
logger.info(f"Sleeping for {PG_SLEEP_TIME} seconds so postgres could stabilize")
6168
time.sleep(PG_SLEEP_TIME)
6269

@@ -65,18 +72,28 @@ def rm_container():
6572
finally:
6673
rm_container()
6774

75+
6876
def setup_pubsub_endpoint(app: FastAPI, broadcast_url: str, path: str):
6977
"""
7078
sets up endpoints on the fastapi app:
7179
- a pub/sub websocket endpoint for clients to connect to
7280
- a trigger endpoint that causes the pub/sub server to publish a message on a predefined topic
7381
"""
74-
logger.info(f"[{path} endpoint] connecting to broadcast backbone service on '{broadcast_url}'")
75-
endpoint = PubSubEndpoint(broadcaster=broadcast_url, ignore_broadcaster_disconnected=False)
82+
logger.info(
83+
f"[{path} endpoint] connecting to broadcast backbone service on '{broadcast_url}'"
84+
)
85+
endpoint = PubSubEndpoint(
86+
broadcaster=broadcast_url, ignore_broadcaster_disconnected=False
87+
)
7688

7789
@app.websocket(path)
7890
async def websocket_rpc_endpoint(websocket: WebSocket):
7991
await endpoint.main_loop(websocket)
92+
try:
93+
# Close connection if not already closed
94+
await websocket.close()
95+
except:
96+
pass
8097

8198
@app.get(f"{path}/trigger")
8299
async def trigger_events():
@@ -100,10 +117,11 @@ def setup_server(broadcast_url):
100117
logger.info("Running server app")
101118
uvicorn.run(app, port=PORT)
102119

120+
103121
@pytest.fixture()
104122
def server(postgres):
105123
# Run the server as a separate process
106-
proc = Process(target=setup_server, args=(postgres, ), daemon=True)
124+
proc = Process(target=setup_server, args=(postgres,), daemon=True)
107125
proc.start()
108126
logger.info("Server started on a daemon process")
109127
yield proc
@@ -132,7 +150,10 @@ async def on_event(data, topic):
132150

133151
async with PubSubClient() as client1:
134152
async with PubSubClient() as client2:
135-
for c, uri in [(client1,first_endpoint_uri), (client2,second_endpoint_uri)]:
153+
for c, uri in [
154+
(client1, first_endpoint_uri),
155+
(client2, second_endpoint_uri),
156+
]:
136157
c.subscribe(EVENT_TOPIC, on_event)
137158
c.start_client(uri)
138159
await c.wait_until_ready()
@@ -155,6 +176,7 @@ async def wait_for_sem():
155176
if repeat + 1 < repeats:
156177
await asyncio.sleep(interval)
157178

179+
158180
@pytest.mark.postgres_idle_timeout(3000)
159181
@pytest.mark.asyncio
160182
async def test_idle_pg_broadcaster_disconnect(server):
@@ -170,4 +192,3 @@ async def test_idle_pg_broadcaster_disconnect(server):
170192
- all servers (and clients) will get both of the messages
171193
"""
172194
await test_all_clients_get_a_topic_via_broadcast(server, repeats=3, interval=4)
173-

0 commit comments

Comments
 (0)