3434 PyMongoError ,
3535)
3636from pymongo .helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
37+ from pymongo .lock import _async_create_lock
3738
3839_IS_SYNC = False
3940
@@ -78,7 +79,10 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
7879_MAX_RETRIES = 3
7980_BACKOFF_INITIAL = 0.05
8081_BACKOFF_MAX = 10
81- _TIME = time
82+ # DRIVERS-3240 will determine these defaults.
83+ DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
84+ DEFAULT_RETRY_TOKEN_RETURN = 0.1
85+ _TIME = time # Added so synchro script doesn't remove the time import.
8286
8387
8488async def _backoff (
@@ -89,23 +93,95 @@ async def _backoff(
8993 await asyncio .sleep (backoff )
9094
9195
96+ class _TokenBucket :
97+ """A token bucket implementation for rate limiting."""
98+
99+ def __init__ (
100+ self ,
101+ capacity : float = DEFAULT_RETRY_TOKEN_CAPACITY ,
102+ return_rate : float = DEFAULT_RETRY_TOKEN_RETURN ,
103+ ):
104+ self .lock = _async_create_lock ()
105+ self .capacity = capacity
106+ # DRIVERS-3240 will determine how full the bucket should start.
107+ self .tokens = capacity
108+ self .return_rate = return_rate
109+
110+ async def consume (self ) -> bool :
111+ """Consume a token from the bucket if available."""
112+ async with self .lock :
113+ if self .tokens >= 1 :
114+ self .tokens -= 1
115+ return True
116+ return False
117+
118+ async def deposit (self , retry : bool = False ) -> None :
119+ """Deposit a token back into the bucket."""
120+ retry_token = 1 if retry else 0
121+ async with self .lock :
122+ self .tokens = min (self .capacity , self .tokens + retry_token + self .return_rate )
123+
124+
125+ class _RetryPolicy :
126+ """A retry limiter that performs exponential backoff with jitter.
127+
128+ Retry attempts are limited by a token bucket to prevent overwhelming the server during
129+ a prolonged outage or high load.
130+ """
131+
132+ def __init__ (
133+ self ,
134+ token_bucket : _TokenBucket ,
135+ attempts : int = _MAX_RETRIES ,
136+ backoff_initial : float = _BACKOFF_INITIAL ,
137+ backoff_max : float = _BACKOFF_MAX ,
138+ ):
139+ self .token_bucket = token_bucket
140+ self .attempts = attempts
141+ self .backoff_initial = backoff_initial
142+ self .backoff_max = backoff_max
143+
144+ async def record_success (self , retry : bool ):
145+ """Record a successful operation."""
146+ await self .token_bucket .deposit (retry )
147+
148+ async def backoff (self , attempt : int ) -> None :
149+ """Return the backoff duration for the given ."""
150+ await _backoff (max (0 , attempt - 1 ), self .backoff_initial , self .backoff_max )
151+
152+ async def should_retry (self , attempt : int ) -> bool :
153+ """Return if we have budget to retry and how long to backoff."""
154+ # TODO: Check CSOT deadline here.
155+ if attempt > self .attempts :
156+ return False
157+ # Check token bucket last since we only want to consume a token if we actually retry.
158+ if not await self .token_bucket .consume ():
159+ # DRIVERS-3246 Improve diagnostics when this case happens.
160+ # We could add info to the exception and log.
161+ return False
162+ return True
163+
164+
92165def _retry_overload (func : F ) -> F :
93166 @functools .wraps (func )
94- async def inner (* args : Any , ** kwargs : Any ) -> Any :
167+ async def inner (self : Any , * args : Any , ** kwargs : Any ) -> Any :
168+ retry_policy = self ._retry_policy
95169 attempt = 0
96170 while True :
97171 try :
98- return await func (* args , ** kwargs )
172+ res = await func (self , * args , ** kwargs )
173+ await retry_policy .record_success (retry = attempt > 0 )
174+ return res
99175 except PyMongoError as exc :
100176 if not exc .has_error_label ("Retryable" ):
101177 raise
102178 attempt += 1
103- if attempt > _MAX_RETRIES :
179+ if not await retry_policy . should_retry ( attempt ) :
104180 raise
105181
106182 # Implement exponential backoff on retry.
107183 if exc .has_error_label ("SystemOverloaded" ):
108- await _backoff (attempt )
184+ await retry_policy . backoff (attempt )
109185 continue
110186
111187 return cast (F , inner )
0 commit comments