3232#include < folly/TokenBucket.h>
3333
3434#include " mongo/logv2/log.h"
35+ #include " mongo/util/duration.h"
36+ #include " mongo/util/scopeguard.h"
3537
3638#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
3739
@@ -46,37 +48,42 @@ Milliseconds doubleToMillis(double t) {
4648} // namespace
4749
4850struct RateLimiter ::RateLimiterPrivate {
49- RateLimiterPrivate (
50- double r, double b, int64_t m, std::string n, std::unique_ptr<RateLimiter::Stats> s)
51- : tokenBucket{r, b}, maxQueueDepth(m), name(std::move(n)), stats(std::move(s)) {}
51+ RateLimiterPrivate (double r, double b, int64_t m, std::string n)
52+ // Initialize the token bucket with one "burst" of tokens. The third parameter to
53+ // tokenBucket's constructor ("zeroTime") is interpreted as a number of seconds from the
54+ // epoch of the clock used by the token bucket. The clock is
55+ // `std::chrono::steady_clock`, whose epoch is unspecified but is usually the boot time
56+ // of the machine. Rather than have an initial accumulation of tokens based on some
57+ // unknown point in the past, set the zero time to a known time in the past: enough time
58+ // for burst size (b) tokens to have accumulated.
59+ : tokenBucket{r, b, folly::TokenBucket::defaultClockNow () - b / r},
60+ maxQueueDepth (m),
61+ queued (0 ),
62+ name (std::move(n)) {}
5263
5364 WriteRarelyRWMutex rwMutex;
5465 folly::TokenBucket tokenBucket;
5566
67+ Stats stats;
68+
5669 Atomic<int64_t > maxQueueDepth;
57- Atomic<int64_t > numWaiters ;
70+ Atomic<int64_t > queued ;
5871
5972 std::string name;
6073
61- /* *
62- * Users may inherit from the RateLimiter::Stats class and define their own stats to track.
63- */
64- std::unique_ptr<Stats> stats;
65-
66- Status rejectIfOverQueueLimit ( double nWaiters) {
67- auto maxDepth = maxQueueDepth.loadRelaxed ();
68- if ( MONGO_unlikely (nWaiters >= maxDepth)) {
69- {
70- auto lk = rwMutex. readLock ();
71- tokenBucket. returnTokens ( 1.0 );
74+ Status enqueue () {
75+ const auto maxDepth = maxQueueDepth. loadRelaxed ();
76+ int64_t expected = queued. load ();
77+ do {
78+ if (expected >= maxDepth) {
79+ return Status (ErrorCodes::TemporarilyUnavailable,
80+ fmt::format ( " RateLimiter queue depth has exceeded the maxQueueDepth. "
81+ " numWaiters={}; maxQueueDepth={}; rateLimiterName={} " ,
82+ expected,
83+ maxDepth,
84+ name) );
7285 }
73- return Status (ErrorCodes::TemporarilyUnavailable,
74- fmt::format (" RateLimiter queue depth has exceeded the maxQueueDepth. "
75- " numWaiters={}; maxQueueDepth={}; rateLimiterName={}" ,
76- nWaiters,
77- maxDepth,
78- name));
79- }
86+ } while (!queued.compareAndSwap (&expected, expected + 1 ));
8087
8188 return Status::OK ();
8289 }
@@ -85,20 +92,21 @@ struct RateLimiter::RateLimiterPrivate {
8592RateLimiter::RateLimiter (double refreshRatePerSec,
8693 double burstSize,
8794 int64_t maxQueueDepth,
88- std::string name,
89- std::unique_ptr<Stats> stats) {
95+ std::string name) {
9096 uassert (ErrorCodes::InvalidOptions,
9197 fmt::format (" burstSize cannot be less than 1.0. burstSize={}; rateLimiterName={}" ,
9298 burstSize,
9399 name),
94100 burstSize >= 1.0 );
95101 _impl = std::make_unique<RateLimiterPrivate>(
96- refreshRatePerSec, burstSize, maxQueueDepth, std::move (name), std::move (stats) );
102+ refreshRatePerSec, burstSize, maxQueueDepth, std::move (name));
97103}
98104
99105RateLimiter::~RateLimiter () = default ;
100106
101107Status RateLimiter::acquireToken (OperationContext* opCtx) {
108+ _impl->stats .attemptedAdmissions .incrementRelaxed ();
109+
102110 // The consumeWithBorrowNonBlocking API consumes a token (possibly leading to a negative
103111 // bucket balance), and returns how long the consumer should nap until their token
104112 // reservation becomes valid.
@@ -111,12 +119,15 @@ Status RateLimiter::acquireToken(OperationContext* opCtx) {
111119 }
112120
113121 if (auto napTime = doubleToMillis (waitForTokenSecs); napTime > Milliseconds{0 }) {
114- auto nWaiters = _impl->numWaiters .fetchAndAdd (1 );
115- ON_BLOCK_EXIT ([&] { _impl->numWaiters .fetchAndSubtract (1 ); });
116- if (auto s = _impl->rejectIfOverQueueLimit (nWaiters); !s.isOK ()) {
117- return s;
122+ if (auto status = _impl->enqueue (); !status.isOK ()) {
123+ _impl->stats .rejectedAdmissions .incrementRelaxed ();
124+ return status;
118125 }
119-
126+ _impl->stats .addedToQueue .incrementRelaxed ();
127+ ON_BLOCK_EXIT ([&] {
128+ _impl->stats .removedFromQueue .incrementRelaxed ();
129+ _impl->queued .fetchAndSubtract (1 );
130+ });
120131 try {
121132 LOGV2_DEBUG (10550200 ,
122133 4 ,
@@ -125,6 +136,7 @@ Status RateLimiter::acquireToken(OperationContext* opCtx) {
125136 " napTimeMillis" _attr = napTime.toString ());
126137 opCtx->sleepFor (napTime);
127138 } catch (const DBException& e) {
139+ _impl->stats .interruptedInQueue .incrementRelaxed ();
128140 LOGV2_DEBUG (10440800 ,
129141 4 ,
130142 " Interrupted while waiting in rate limiter queue" ,
@@ -139,15 +151,14 @@ Status RateLimiter::acquireToken(OperationContext* opCtx) {
139151 _impl->name ));
140152 }
141153 }
142- return Status::OK ();
143- }
144154
145- RateLimiter::Stats* RateLimiter::getRateLimiterStats () {
146- return _impl->stats .get ();
155+ _impl->stats .successfulAdmissions .incrementRelaxed ();
156+ _impl->stats .averageTimeQueuedMicros .addSample (waitForTokenSecs * 1'000'000 );
157+ return Status::OK ();
147158}
148159
149- int64_t RateLimiter::getNumWaiters () {
150- return _impl->numWaiters . load ();
160+ void RateLimiter::recordExemption () {
161+ _impl->stats . exemptedAdmissions . incrementRelaxed ();
151162}
152163
153164void RateLimiter::setRefreshRatePerSec (double refreshRatePerSec) {
@@ -169,4 +180,33 @@ void RateLimiter::setBurstSize(double burstSize) {
169180void RateLimiter::setMaxQueueDepth (int64_t maxQueueDepth) {
170181 _impl->maxQueueDepth .storeRelaxed (maxQueueDepth);
171182}
183+
184+ const RateLimiter::Stats& RateLimiter::stats () const {
185+ return _impl->stats ;
186+ }
187+
188+ void RateLimiter::appendStats (BSONObjBuilder* bob) const {
189+ invariant (bob);
190+ bob->append (" addedToQueue" , stats ().addedToQueue .get ());
191+ bob->append (" removedFromQueue" , stats ().removedFromQueue .get ());
192+ bob->append (" interruptedInQueue" , stats ().interruptedInQueue .get ());
193+ bob->append (" rejectedAdmissions" , stats ().rejectedAdmissions .get ());
194+ bob->append (" exemptedAdmissions" , stats ().exemptedAdmissions .get ());
195+ bob->append (" successfulAdmissions" , stats ().successfulAdmissions .get ());
196+ bob->append (" attemptedAdmissions" , stats ().attemptedAdmissions .get ());
197+ if (const auto avg = stats ().averageTimeQueuedMicros .get ()) {
198+ bob->append (" averageTimeQueuedMicros" , *avg);
199+ }
200+ bob->append (" totalAvailableTokens" , tokensAvailable ());
201+ }
202+
203+ double RateLimiter::tokensAvailable () const {
204+ auto lk = _impl->rwMutex .readLock ();
205+ return _impl->tokenBucket .available ();
206+ }
207+
208+ int64_t RateLimiter::queued () const {
209+ return _impl->queued .load ();
210+ }
211+
172212} // namespace mongo::admission
0 commit comments