Skip to content

Commit c7e68a8

Browse files
committed
feat(http)!: Implement sane rate limiting locks, retries, and account for errors, including refactored logic.
1 parent b51983d commit c7e68a8

File tree

1 file changed

+104
-59
lines changed

1 file changed

+104
-59
lines changed

interactions/api/http.py

Lines changed: 104 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
import traceback
13
from asyncio import AbstractEventLoop, Lock, get_event_loop, get_running_loop
24
from json import dumps
35
from logging import Logger, getLogger
@@ -80,6 +82,19 @@ def bucket(self) -> str:
8082
"""
8183
return f"{self.channel_id}:{self.guild_id}:{self.path}"
8284

85+
@property
86+
def hashbucket(self) -> str:
87+
"""
88+
Returns the route's full bucket, reproducible for paeudo-hashing.
89+
This contains both bucket properties, but also the METHOD attribute.
90+
Note, that this does NOT contain the hash.
91+
92+
:return: The route bucket.
93+
:rtype: str
94+
"""
95+
96+
return f"{self.method}::{self.bucket}"
97+
8398

8499
class Limiter:
85100
"""
@@ -135,7 +150,7 @@ class Request:
135150
)
136151
token: str
137152
_loop: AbstractEventLoop
138-
ratelimits: Dict[Route, Limiter]
153+
ratelimits: Dict[str, Limiter] # hashbucket: Limiter
139154
_headers: dict
140155
_session: ClientSession
141156
_global_lock: Limiter
@@ -180,71 +195,101 @@ async def request(self, route: Route, **kwargs) -> Optional[Any]:
180195
:return: The contents of the request if any.
181196
:rtype: Optional[Any]
182197
"""
183-
self._check_session()
184-
await self._check_lock()
185198

186-
# This is the per-route check. We check BEFORE the request is made
187-
# to see if there's a rate limit for it. If there is, we'll call this
188-
# later in the event loop and reset the remaining time. Otherwise,
189-
# we'll set a "limiter" for it respective to that bucket. The hashes will
190-
# be checked later.
191-
if self.ratelimits.get(route):
192-
bucket: Limiter = self.ratelimits.get(route)
199+
kwargs["headers"] = {**self._headers, **kwargs.get("headers", {})}
200+
kwargs["headers"]["Content-Type"] = "application/json"
201+
202+
reason = kwargs.pop("reason", None)
203+
if reason:
204+
kwargs["headers"]["X-Audit-Log-Reason"] = quote(reason, safe="/ ")
205+
206+
# Huge credit and thanks to LordOfPolls for the lock/retry logic.
207+
208+
# This section generates the bucket through the hashbucket attr,
209+
# which essentially contains path, method, and major params.
210+
211+
if self.ratelimits.get(route.hashbucket):
212+
bucket: Limiter = self.ratelimits.get(route.hashbucket)
193213
if bucket.lock.locked():
194214
log.warning(
195215
f"The current bucket is still under a rate limit. Calling later in {bucket.reset_after} seconds."
196216
)
197-
self._loop.call_later(bucket.reset_after, bucket.lock)
198-
await bucket.lock.acquire()
217+
self._loop.call_later(bucket.reset_after, bucket.lock.release)
199218
bucket.reset_after = 0
200219
else:
201-
self.ratelimits.update({route: Limiter(lock=Lock(loop=self._loop))})
202-
203-
# We're controlling our HTTP request with the route as its own
204-
# separate lock here. This way, we can control the request of the
205-
# route as an asynchronous method. This way, if the event loop is to call on this later,
206-
# this will temporarily block but still allow to process the original request
207-
# we wanted to make.
208-
async with self.ratelimits.get(route) as _lock:
209-
kwargs["headers"] = {**self._headers, **kwargs.get("headers", {})}
210-
kwargs["headers"]["Content-Type"] = "application/json"
211-
212-
reason = kwargs.pop("reason", None)
213-
if reason:
214-
kwargs["headers"]["X-Audit-Log-Reason"] = quote(reason, safe="/ ")
215-
216-
async with self._session.request(
217-
route.method, route.__api__ + route.path, **kwargs
218-
) as response:
219-
data = await response.json(content_type=None)
220-
reset_after: str = response.headers.get("X-RateLimit-Reset-After")
221-
remaining: str = response.headers.get("X-RateLimit-Remaining")
222-
bucket: str = response.headers.get("X-RateLimit-Bucket")
223-
is_global: bool = response.headers.get("X-RateLimit-Global", False)
224-
225-
log.debug(f"{route.method}: {route.__api__ + route.path}: {kwargs}")
226-
log.debug(f"RETURN {response.status}: {dumps(data, indent=4, sort_keys=True)}")
227-
228-
if bucket not in _lock.hashes:
229-
_lock.hashes.append(bucket)
230-
231-
if isinstance(data, dict) and data.get("errors"):
232-
raise HTTPException(data["code"], message=data["message"])
233-
elif remaining and not int(remaining):
234-
if response.status == 429:
235-
log.warning(
236-
f"The HTTP client has encountered a per-route ratelimit. Locking down future requests for {reset_after} seconds."
237-
)
238-
_lock.reset_after = reset_after
239-
self._loop.call_later(_lock.reset_after, _lock.lock)
240-
elif is_global:
241-
log.warning(
242-
f"The HTTP client has encountered a global ratelimit. Locking down future requests for {reset_after} seconds."
243-
)
244-
self._global_lock.reset_after = reset_after
245-
self._loop.call_later(self._global_lock.reset_after, self._globl_lock.lock)
246-
247-
return data
220+
self.ratelimits.update({route.hashbucket: Limiter(lock=Lock(loop=self._loop))})
221+
bucket: Limiter = self.ratelimits.get(route.hashbucket)
222+
223+
await bucket.lock.acquire()
224+
225+
# Implement retry logic. The common seems to be 5, so this is hardcoded, for the most part.
226+
227+
for tries in range(5): # 3, 5? 5 seems to be common
228+
try:
229+
self._check_session()
230+
await self._check_lock()
231+
232+
async with self._session.request(
233+
route.method, route.__api__ + route.path, **kwargs
234+
) as response:
235+
236+
data = await response.json(content_type=None)
237+
reset_after: float = float(
238+
response.headers.get("X-RateLimit-Reset-After", "0.0")
239+
)
240+
remaining: str = response.headers.get("X-RateLimit-Remaining")
241+
_bucket: str = response.headers.get("X-RateLimit-Bucket")
242+
is_global: bool = response.headers.get("X-RateLimit-Global", False)
243+
244+
log.debug(f"{route.method}: {route.__api__ + route.path}: {kwargs}")
245+
246+
if _bucket not in bucket.hashes:
247+
bucket.hashes.append(_bucket)
248+
249+
if isinstance(data, dict) and data.get("errors"):
250+
raise HTTPException(data["code"], message=data["message"])
251+
elif remaining and not int(remaining):
252+
if response.status == 429:
253+
log.warning(
254+
f"The HTTP client has encountered a per-route ratelimit. Locking down future requests for {reset_after} seconds."
255+
)
256+
bucket.reset_after = reset_after
257+
await asyncio.sleep(bucket.reset_after)
258+
continue
259+
elif is_global:
260+
log.warning(
261+
f"The HTTP client has encountered a global ratelimit. Locking down future requests for {reset_after} seconds."
262+
)
263+
self._global_lock.reset_after = reset_after
264+
self._loop.call_later(
265+
self._global_lock.reset_after, self._global_lock.lock.release
266+
)
267+
268+
log.debug(f"RETURN {response.status}: {dumps(data, indent=4, sort_keys=True)}")
269+
return data
270+
271+
# These account for general/specific exceptions. (Windows...)
272+
except OSError as e:
273+
if tries < 4 and e.errno in (54, 10054):
274+
await asyncio.sleep(2 * tries + 1)
275+
continue
276+
try:
277+
bucket.lock.release()
278+
except RuntimeError:
279+
pass
280+
raise
281+
282+
# For generic exceptions we give a traceback for debug reasons.
283+
except Exception as e:
284+
try:
285+
bucket.lock.release()
286+
except RuntimeError:
287+
pass
288+
log.error("".join(traceback.format_exception(type(e), e, e.__traceback__)))
289+
break
290+
291+
if bucket.lock.locked():
292+
bucket.lock.release()
248293

249294
async def close(self) -> None:
250295
"""Closes the current session."""

0 commit comments

Comments
 (0)