|
18 | 18 | '{{"type":"next","id":"{query_id}","payload":{{"data":{{"number":{number}}}}}}}' |
19 | 19 | ) |
20 | 20 |
|
21 | | -COUNTING_DELAY = 2 * MS |
22 | | -PING_SENDING_DELAY = 5 * MS |
23 | | -PONG_TIMEOUT = 2 * MS |
| 21 | +COUNTING_DELAY = 20 * MS |
| 22 | +PING_SENDING_DELAY = 50 * MS |
| 23 | +PONG_TIMEOUT = 100 * MS |
24 | 24 |
|
25 | 25 | # List which can used to store received messages by the server |
26 | 26 | logged_messages: List[str] = [] |
@@ -48,100 +48,140 @@ async def server_countdown_template(ws, path): |
48 | 48 |
|
49 | 49 | count_found = search("count: {:d}", query) |
50 | 50 | count = count_found[0] |
51 | | - print(f"Countdown started from: {count}") |
| 51 | + print(f" Server: Countdown started from: {count}") |
52 | 52 |
|
53 | 53 | pong_received: asyncio.Event = asyncio.Event() |
54 | 54 |
|
55 | 55 | async def counting_coro(): |
56 | | - for number in range(count, -1, -1): |
57 | | - await ws.send( |
58 | | - countdown_server_answer.format(query_id=query_id, number=number) |
59 | | - ) |
60 | | - await asyncio.sleep(COUNTING_DELAY) |
| 56 | + print(" Server: counting task started") |
| 57 | + try: |
| 58 | + for number in range(count, -1, -1): |
| 59 | + await ws.send( |
| 60 | + countdown_server_answer.format( |
| 61 | + query_id=query_id, number=number |
| 62 | + ) |
| 63 | + ) |
| 64 | + await asyncio.sleep(COUNTING_DELAY) |
| 65 | + finally: |
| 66 | + print(" Server: counting task ended") |
61 | 67 |
|
| 68 | + print(" Server: starting counting task") |
62 | 69 | counting_task = asyncio.ensure_future(counting_coro()) |
63 | 70 |
|
64 | 71 | async def keepalive_coro(): |
65 | | - while True: |
66 | | - await asyncio.sleep(PING_SENDING_DELAY) |
67 | | - try: |
68 | | - # Send a ping |
69 | | - await WebSocketServerHelper.send_ping( |
70 | | - ws, payload="dummy_ping_payload" |
71 | | - ) |
72 | | - |
73 | | - # Wait for a pong |
| 72 | + print(" Server: keepalive task started") |
| 73 | + try: |
| 74 | + while True: |
| 75 | + await asyncio.sleep(PING_SENDING_DELAY) |
74 | 76 | try: |
75 | | - await asyncio.wait_for(pong_received.wait(), PONG_TIMEOUT) |
76 | | - except asyncio.TimeoutError: |
77 | | - print("\nNo pong received in time!\n") |
| 77 | + # Send a ping |
| 78 | + await WebSocketServerHelper.send_ping( |
| 79 | + ws, payload="dummy_ping_payload" |
| 80 | + ) |
| 81 | + |
| 82 | + # Wait for a pong |
| 83 | + try: |
| 84 | + await asyncio.wait_for( |
| 85 | + pong_received.wait(), PONG_TIMEOUT |
| 86 | + ) |
| 87 | + except asyncio.TimeoutError: |
| 88 | + print( |
| 89 | + "\n Server: No pong received in time!\n" |
| 90 | + ) |
| 91 | + break |
| 92 | + |
| 93 | + pong_received.clear() |
| 94 | + |
| 95 | + except websockets.exceptions.ConnectionClosed: |
78 | 96 | break |
79 | | - |
80 | | - pong_received.clear() |
81 | | - |
82 | | - except websockets.exceptions.ConnectionClosed: |
83 | | - break |
| 97 | + finally: |
| 98 | + print(" Server: keepalive task ended") |
84 | 99 |
|
85 | 100 | if keepalive: |
| 101 | + print(" Server: starting keepalive task") |
86 | 102 | keepalive_task = asyncio.ensure_future(keepalive_coro()) |
87 | 103 |
|
88 | 104 | async def receiving_coro(): |
89 | | - nonlocal counting_task |
90 | | - while True: |
91 | | - |
92 | | - try: |
93 | | - result = await ws.recv() |
94 | | - logged_messages.append(result) |
95 | | - except websockets.exceptions.ConnectionClosed: |
96 | | - break |
97 | | - |
98 | | - json_result = json.loads(result) |
99 | | - |
100 | | - answer_type = json_result["type"] |
101 | | - |
102 | | - if answer_type == "complete" and json_result["id"] == str(query_id): |
103 | | - print("Cancelling counting task now") |
104 | | - counting_task.cancel() |
105 | | - if keepalive: |
106 | | - print("Cancelling keep alive task now") |
107 | | - keepalive_task.cancel() |
108 | | - |
109 | | - elif answer_type == "ping": |
110 | | - if answer_pings: |
111 | | - payload = json_result.get("payload", None) |
112 | | - await WebSocketServerHelper.send_pong(ws, payload=payload) |
| 105 | + print(" Server: receiving task started") |
| 106 | + try: |
| 107 | + nonlocal counting_task |
| 108 | + while True: |
113 | 109 |
|
114 | | - elif answer_type == "pong": |
115 | | - pong_received.set() |
| 110 | + try: |
| 111 | + result = await ws.recv() |
| 112 | + logged_messages.append(result) |
| 113 | + except websockets.exceptions.ConnectionClosed: |
| 114 | + break |
116 | 115 |
|
| 116 | + json_result = json.loads(result) |
| 117 | + |
| 118 | + answer_type = json_result["type"] |
| 119 | + |
| 120 | + if answer_type == "complete" and json_result["id"] == str( |
| 121 | + query_id |
| 122 | + ): |
| 123 | + print("Cancelling counting task now") |
| 124 | + counting_task.cancel() |
| 125 | + if keepalive: |
| 126 | + print("Cancelling keep alive task now") |
| 127 | + keepalive_task.cancel() |
| 128 | + |
| 129 | + elif answer_type == "ping": |
| 130 | + if answer_pings: |
| 131 | + payload = json_result.get("payload", None) |
| 132 | + await WebSocketServerHelper.send_pong( |
| 133 | + ws, payload=payload |
| 134 | + ) |
| 135 | + |
| 136 | + elif answer_type == "pong": |
| 137 | + pong_received.set() |
| 138 | + finally: |
| 139 | + print(" Server: receiving task ended") |
| 140 | + if keepalive: |
| 141 | + keepalive_task.cancel() |
| 142 | + |
| 143 | + print(" Server: starting receiving task") |
117 | 144 | receiving_task = asyncio.ensure_future(receiving_coro()) |
118 | 145 |
|
119 | 146 | try: |
| 147 | + print(" Server: waiting for counting task to complete") |
120 | 148 | await counting_task |
121 | 149 | except asyncio.CancelledError: |
122 | | - print("Now counting task is cancelled") |
| 150 | + print(" Server: Now counting task is cancelled") |
123 | 151 |
|
124 | | - receiving_task.cancel() |
125 | | - |
126 | | - try: |
127 | | - await receiving_task |
128 | | - except asyncio.CancelledError: |
129 | | - print("Now receiving task is cancelled") |
| 152 | + print(" Server: sending complete message") |
| 153 | + await WebSocketServerHelper.send_complete(ws, query_id) |
130 | 154 |
|
131 | 155 | if keepalive: |
| 156 | + print(" Server: cancelling keepalive task") |
132 | 157 | keepalive_task.cancel() |
133 | 158 | try: |
134 | 159 | await keepalive_task |
135 | 160 | except asyncio.CancelledError: |
136 | | - print("Now keepalive task is cancelled") |
| 161 | + print(" Server: Now keepalive task is cancelled") |
| 162 | + |
| 163 | + print(" Server: waiting for client to close the connection") |
| 164 | + try: |
| 165 | + await asyncio.wait_for(receiving_task, 1000 * MS) |
| 166 | + except asyncio.TimeoutError: |
| 167 | + pass |
| 168 | + |
| 169 | + print(" Server: cancelling receiving task") |
| 170 | + receiving_task.cancel() |
| 171 | + |
| 172 | + try: |
| 173 | + await receiving_task |
| 174 | + except asyncio.CancelledError: |
| 175 | + print(" Server: Now receiving task is cancelled") |
137 | 176 |
|
138 | | - await WebSocketServerHelper.send_complete(ws, query_id) |
139 | 177 | except websockets.exceptions.ConnectionClosedOK: |
140 | 178 | pass |
141 | 179 | except AssertionError as e: |
142 | | - print(f"\nAssertion failed: {e!s}\n") |
| 180 | + print(f"\n Server: Assertion failed: {e!s}\n") |
143 | 181 | finally: |
| 182 | + print(" Server: waiting for websocket connection to close") |
144 | 183 | await ws.wait_closed() |
| 184 | + print(" Server: connection closed") |
145 | 185 |
|
146 | 186 | return server_countdown_template |
147 | 187 |
|
@@ -406,6 +446,7 @@ async def test_graphqlws_subscription_with_keepalive( |
406 | 446 | count -= 1 |
407 | 447 |
|
408 | 448 | assert count == -1 |
| 449 | + assert "ping" in session.transport.payloads |
409 | 450 | assert session.transport.payloads["ping"] == "dummy_ping_payload" |
410 | 451 | assert ( |
411 | 452 | session.transport.payloads["connection_ack"] == "dummy_connection_ack_payload" |
@@ -570,18 +611,19 @@ async def test_graphqlws_subscription_manual_pings_with_payload( |
570 | 611 | number = result["number"] |
571 | 612 | print(f"Number received: {number}") |
572 | 613 |
|
573 | | - assert number == count |
574 | | - count -= 1 |
575 | | - |
576 | 614 | payload = {"count_received": count} |
577 | 615 |
|
578 | 616 | await transport.send_ping(payload=payload) |
579 | 617 |
|
580 | | - await transport.pong_received.wait() |
| 618 | + await asyncio.wait_for(transport.pong_received.wait(), 10000 * MS) |
| 619 | + |
581 | 620 | transport.pong_received.clear() |
582 | 621 |
|
583 | 622 | assert transport.payloads["pong"] == payload |
584 | 623 |
|
| 624 | + assert number == count |
| 625 | + count -= 1 |
| 626 | + |
585 | 627 | assert count == -1 |
586 | 628 |
|
587 | 629 |
|
|
0 commit comments