11import Redis from 'ioredis' ;
2- import { RateLimiter , RateLimiterResponse } from '../@types/rateLimit' ;
2+ import { RateLimiter , RateLimiterResponse , RedisBucket , RedisLog } from '../@types/rateLimit' ;
33
44/**
55 * The SlidingWindowLog instance of a RateLimiter limits requests based on a unique user ID.
@@ -13,7 +13,7 @@ import { RateLimiter, RateLimiterResponse } from '../@types/rateLimit';
1313 * 2. Any requests that are older than window size are dropped from the log.
1414 * 3. The complexity of the current request is added to the complexity of all requests in the log.
1515 * 4. If the request exceeds the specified capacity it is dropped.
16- * 5. Otherwise the request is allowed and ther current request is added to the log.
16+ * 5. Otherwise the request is allowed and the current request is added to the end of the log (if it has a complexity > 0) .
1717 */
1818class SlidingWindowLog implements RateLimiter {
1919 private windowSize : number ;
@@ -33,7 +33,40 @@ class SlidingWindowLog implements RateLimiter {
3333 this . capacity = capacity ;
3434 this . client = client ;
3535 if ( windowSize <= 0 || capacity <= 0 )
36- throw SyntaxError ( 'SlidingWindowLog windowSize and capacity must be positive' ) ;
36+ throw SyntaxError ( 'SlidingWindowLog window size and capacity must be positive' ) ;
37+
38+ // TODO: Define lua script for server side computation using either sorted sets or lists
39+ // while x.timestamp + window_size < timestamp lpop
40+ // //https://stackoverflow.com/questions/35677682/filtering-deleting-items-from-a-redis-set
41+ // this.client.defineCommand('popWindow', {
42+ // // 2 value timestamp and complexity of this request
43+ // lua: `
44+ // local totalComplexity = 0 -- complexity of active requests
45+ // local expiredMembers = 0 -- number of requests to remove
46+ // local key = keys[1] -- uuid
47+ // local current_time = keys[2]
48+
49+ // for index, value in next, redis.call(key, ????) do
50+ // -- string comparisson of timestamps
51+ // if .... then
52+
53+ // else
54+ // totalComplexity += ????
55+ // end
56+ // end
57+
58+ // redis.call(pop, ???)
59+
60+ // if total_complexity < window_size then
61+ // then
62+ // end
63+ // return {
64+
65+ // }
66+ // `,
67+ // numberOfKeys: 3, // uuid
68+ // readOnly: true,
69+ // });
3770 }
3871
3972 /**
@@ -50,9 +83,63 @@ class SlidingWindowLog implements RateLimiter {
5083 ) : Promise < RateLimiterResponse > {
5184 // set the expiry of key-value pairs in the cache to 24 hours
5285 const keyExpiry = 86400000 ; // TODO: Make this a global for consistency across each algo.
53- if ( tokens > this . capacity ) return { success : false , tokens : this . capacity } ;
5486
55- throw new Error ( 'SlidingWindowLog.processRequest not implemented' ) ;
87+ // Each user's log is represented by a redis list with a score = request timestamp
88+ // and a value equal to the complexity
89+ // Drop expired requests from the log. represented by a sorted set in redis
90+
91+ // Get the log from redis
92+ let requestLog : RedisLog = JSON . parse ( ( await this . client . get ( uuid ) ) || '[]' ) ;
93+
94+ // Iterate through the list in reverse and count active tokens
95+ // This allows us to track the threshold for when this request would be allowed if it is blocked
96+ // Stop at the first timestamp that's expired and cut the rest.
97+
98+ const cutoff = timestamp - this . windowSize ;
99+ let tokensInLog = 0 ; // total active tokens in the log
100+ let cutoffIndex = 0 ; // index of oldest active request
101+ let lastAllowedIndex = requestLog . length ; // Index of oldest request in the log for which this request would be allowed.
102+
103+ for ( let index = requestLog . length - 1 ; index >= 0 ; index -- ) {
104+ if ( cutoff >= requestLog [ index ] . timestamp ) {
105+ // we reached the first expired request
106+ cutoffIndex = index + 1 ;
107+ break ;
108+ } else {
109+ // the request is active
110+ tokensInLog += requestLog [ index ] . tokens ;
111+ if ( this . capacity - tokensInLog >= tokens ) {
112+ // the log is able to accept the current request
113+ lastAllowedIndex = index ;
114+ }
115+ }
116+ }
117+
118+ // Time (ms) after which the current request would succeed if it is blocked.
119+ let retryAfter : number ;
120+
121+ // Request will never be allowed
122+ if ( tokens > this . capacity ) retryAfter = Infinity ;
123+ // need the request before lastAllowedIndex
124+ else if ( lastAllowedIndex > 0 )
125+ retryAfter = this . windowSize + requestLog [ lastAllowedIndex - 1 ] . timestamp ;
126+ else retryAfter = 0 ; // request is allowed
127+
128+ // Conditional check to avoid unecessary slice
129+ if ( cutoffIndex > 0 ) requestLog = requestLog . slice ( cutoffIndex ) ;
130+
131+ // allow/disallow current request
132+ if ( tokensInLog + tokens <= this . capacity ) {
133+ // update the log
134+ if ( tokens > 0 ) requestLog . push ( { timestamp, tokens } ) ;
135+ await this . client . setex ( uuid , keyExpiry , JSON . stringify ( requestLog ) ) ;
136+ tokensInLog += tokens ;
137+ return { success : true , tokens : this . capacity - tokensInLog } ;
138+ }
139+
140+ await this . client . setex ( uuid , keyExpiry , JSON . stringify ( requestLog ) ) ;
141+
142+ return { success : false , tokens : this . capacity - tokensInLog , retryAfter } ;
56143 }
57144
58145 /**
0 commit comments