Skip to content

Commit 862280c

Browse files
FayeDelEepyElvyra
andauthored
feat(gateway): Client Latency support and introduce ready state (#548)
* feat(gateway): Client Latency support, introduce ready state, fix headers/docs * feat: Extend helper method to the client class. * fix: Add attempt to fix gateway recursion, minor refactor gateway header usage. * docs: Document new gateway latency attributes and ready state. Co-authored-by: EdVraz <88881326+EdVraz@users.noreply.github.com>
1 parent 38b3dba commit 862280c

File tree

4 files changed

+61
-19
lines changed

4 files changed

+61
-19
lines changed

interactions/api/gateway.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
new_event_loop,
1414
sleep,
1515
)
16-
from logging import Logger
1716
from sys import platform, version_info
17+
from time import perf_counter
1818
from typing import Any, Dict, List, Optional, Tuple, Union
1919

2020
from aiohttp import WSMessage
@@ -30,7 +30,7 @@
3030
from .models.misc import MISSING
3131
from .models.presence import ClientPresence
3232

33-
log: Logger = get_logger("gateway")
33+
log = get_logger("gateway")
3434

3535

3636
__all__ = ("_Heartbeat", "WebSocketClient")
@@ -69,9 +69,13 @@ class WebSocketClient:
6969
:ivar _Heartbeat __heartbeater: The context state of a "heartbeat" made to the Gateway.
7070
:ivar Optional[List[Tuple[int]]] __shard: The shards used during connection.
7171
:ivar Optional[ClientPresence] __presence: The presence used in connection.
72+
:ivar Event ready: The ready state of the client as an ``asyncio.Event``.
7273
:ivar Task __task: The closing task for ending connections.
73-
:ivar int session_id: The ID of the ongoing session.
74-
:ivar str sequence: The sequence identifier of the ongoing session.
74+
:ivar Optional[str] session_id: The ID of the ongoing session.
75+
:ivar Optional[int] sequence: The sequence identifier of the ongoing session.
76+
:ivar float _last_send: The latest time of the last send_packet function call since connection creation, in seconds.
77+
:ivar float _last_ack: The latest time of the last ``HEARTBEAT_ACK`` event since connection creation, in seconds.
78+
:ivar float latency: The latency of the connection, in seconds.
7579
"""
7680

7781
__slots__ = (
@@ -90,13 +94,16 @@ class WebSocketClient:
9094
"session_id",
9195
"sequence",
9296
"ready",
97+
"_last_send",
98+
"_last_ack",
99+
"latency",
93100
)
94101

95102
def __init__(
96103
self,
97104
token: str,
98105
intents: Intents,
99-
session_id: Optional[int] = MISSING,
106+
session_id: Optional[str] = MISSING,
100107
sequence: Optional[int] = MISSING,
101108
) -> None:
102109
"""
@@ -105,7 +112,7 @@ def __init__(
105112
:param intents: The Gateway intents of the application for event dispatch.
106113
:type intents: Intents
107114
:param session_id?: The ID of the session if trying to reconnect. Defaults to ``None``.
108-
:type session_id: Optional[int]
115+
:type session_id: Optional[str]
109116
:param sequence?: The identifier sequence if trying to reconnect. Defaults to ``None``.
110117
:type sequence: Optional[int]
111118
"""
@@ -132,7 +139,12 @@ def __init__(
132139
self.__task = None
133140
self.session_id = None if session_id is MISSING else session_id
134141
self.sequence = None if sequence is MISSING else sequence
135-
self.ready = None
142+
self.ready = Event(loop=self._loop) if version_info < (3, 10) else Event()
143+
144+
self._last_send = perf_counter()
145+
self._last_ack = perf_counter()
146+
self.latency: float("nan") # noqa: F821
147+
# self.latency has to be noqa, this is valid in python but not in Flake8.
136148

137149
async def _manage_heartbeat(self) -> None:
138150
"""Manages the heartbeat loop."""
@@ -186,7 +198,7 @@ async def _establish_connection(
186198
continue
187199
if self._client.close_code in range(4010, 4014) or self._client.close_code == 4004:
188200
raise GatewayException(self._client.close_code)
189-
elif self._client.close_code is not None:
201+
elif self._closed: # Redundant conditional.
190202
await self._establish_connection()
191203

192204
await self._handle_connection(stream, shard, presence)
@@ -226,8 +238,10 @@ async def _handle_connection(
226238
if op == OpCodeType.HEARTBEAT:
227239
await self.__heartbeat()
228240
if op == OpCodeType.HEARTBEAT_ACK:
241+
self._last_ack = perf_counter()
229242
log.debug("HEARTBEAT_ACK")
230243
self.__heartbeater.event.set()
244+
self.latency = self._last_ack - self._last_send
231245
if op in (OpCodeType.INVALIDATE_SESSION, OpCodeType.RECONNECT):
232246
log.debug("INVALID_SESSION/RECONNECT")
233247

@@ -243,10 +257,15 @@ async def _handle_connection(
243257
self.sequence = stream["s"]
244258
self._dispatch.dispatch("on_ready")
245259
log.debug(f"READY (session_id: {self.session_id}, seq: {self.sequence})")
260+
self.ready.set()
246261
else:
247262
log.debug(f"{event}: {data}")
248263
self._dispatch_event(event, data)
249264

265+
async def wait_until_ready(self):
266+
"""Waits for the client to become ready according to the Gateway."""
267+
await self.ready.wait()
268+
250269
def _dispatch_event(self, event: str, data: dict) -> None:
251270
"""
252271
Dispatches an event from the Gateway.
@@ -282,7 +301,7 @@ def _dispatch_event(self, event: str, data: dict) -> None:
282301
)
283302
else:
284303
_context = self.__contextualize(data)
285-
_name: str
304+
_name: str = ""
286305
__args: list = [_context]
287306
__kwargs: dict = {}
288307

@@ -339,9 +358,9 @@ def _dispatch_event(self, event: str, data: dict) -> None:
339358

340359
self._dispatch.dispatch("on_modal", _context)
341360

342-
self._dispatch.dispatch(_name, *__args, **__kwargs)
343-
self._dispatch.dispatch("on_interaction", _context)
344-
self._dispatch.dispatch("on_interaction_create", _context)
361+
self._dispatch.dispatch(_name, *__args, **__kwargs)
362+
self._dispatch.dispatch("on_interaction", _context)
363+
self._dispatch.dispatch("on_interaction_create", _context)
345364

346365
self._dispatch.dispatch("raw_socket_create", data)
347366

@@ -459,6 +478,7 @@ async def _send_packet(self, data: Dict[str, Any]) -> None:
459478
:param data: The data to send to the Gateway.
460479
:type data: Dict[str, Any]
461480
"""
481+
self._last_send = perf_counter()
462482
packet: str = dumps(data).decode("utf-8") if isinstance(data, dict) else data
463483
await self._client.send_str(packet)
464484
log.debug(packet)

interactions/api/gateway.pyi

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,19 @@ from asyncio import (
44
Task,
55
)
66
from logging import Logger
7-
from typing import Any, Dict, List, Optional, Tuple
7+
from typing import Any, Dict, List, Optional, Tuple, Union, Iterable
88

99
from aiohttp import ClientWebSocketResponse
1010

11-
from ..base import get_logger
11+
from ..models import Option
1212
from ..api.models.misc import MISSING
1313
from ..api.models.presence import ClientPresence
1414
from .dispatch import Listener
1515
from .http import HTTPClient
1616
from .models.flags import Intents
1717

18-
log: Logger = get_logger("gateway")
19-
20-
__all__ = ("_Heartbeat", "WebSocketClient")
18+
log: Logger
19+
__all__: Iterable[str]
2120

2221
class _Heartbeat:
2322
event: Event
@@ -37,8 +36,13 @@ class WebSocketClient:
3736
__shard: Optional[List[Tuple[int]]]
3837
__presence: Optional[ClientPresence]
3938
__task: Optional[Task]
40-
session_id: int
41-
sequence: str
39+
session_id: Optional[str]
40+
sequence: Optional[int]
41+
_last_send: float
42+
_last_ack: float
43+
latency: float
44+
ready: Event
45+
4246
def __init__(
4347
self,
4448
token: str,
@@ -57,6 +61,11 @@ class WebSocketClient:
5761
shard: Optional[List[Tuple[int]]] = MISSING,
5862
presence: Optional[ClientPresence] = MISSING,
5963
) -> None: ...
64+
async def wait_until_ready(self) -> None: ...
65+
def _dispatch_event(self, event: str, data: dict) -> None: ...
66+
def __contextualize(self, data: dict) -> object: ...
67+
def __sub_command_context(self, data: Union[dict, Option]) -> Union[Tuple[str], dict]: ...
68+
def __option_type_context(self, context: object, type: int) -> dict: ...
6069
@property
6170
async def __receive_packet_stream(self) -> Optional[Dict[str, Any]]: ...
6271
async def _send_packet(self, data: Dict[str, Any]) -> None: ...

interactions/client.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ def __init__(
9898
data = self._loop.run_until_complete(self._http.get_current_bot_information())
9999
self.me = Application(**data)
100100

101+
@property
102+
def latency(self) -> float:
103+
"""Returns the connection latency in milliseconds."""
104+
105+
return self._websocket.latency * 1000
106+
101107
def start(self) -> None:
102108
"""Starts the client session."""
103109
self._loop.run_until_complete(self._ready())
@@ -305,6 +311,10 @@ async def _login(self) -> None:
305311
while not self._websocket._closed:
306312
await self._websocket._establish_connection(self._shard, self._presence)
307313

314+
async def wait_until_ready(self) -> None:
315+
"""Helper method that waits until the websocket is ready."""
316+
await self._websocket.wait_until_ready()
317+
308318
def event(self, coro: Coroutine, name: Optional[str] = MISSING) -> Callable[..., Any]:
309319
"""
310320
A decorator for listening to events dispatched from the

interactions/client.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class Client:
3434
token: str,
3535
**kwargs,
3636
) -> None: ...
37+
@property
38+
def latency(self) -> float: ...
3739
def start(self) -> None: ...
3840
def __register_events(self) -> None: ...
3941
async def __compare_sync(self, data: dict) -> None: ...
@@ -44,6 +46,7 @@ class Client:
4446
async def _synchronize(self, payload: Optional[dict] = None) -> None: ...
4547
async def _ready(self) -> None: ...
4648
async def _login(self) -> None: ...
49+
async def wait_until_ready(self) -> None: ...
4750
def event(self, coro: Coroutine, name: Optional[str] = None) -> Callable[..., Any]: ...
4851
def __check_command(
4952
self,

0 commit comments

Comments
 (0)