@@ -139,6 +139,10 @@ export class MarQS {
139139 return this . redis . zcard ( this . keys . queueKey ( env , queue , concurrencyKey ) ) ;
140140 }
141141
142+ public async lengthOfEnvQueue ( env : AuthenticatedEnvironment ) {
143+ return this . redis . zcard ( this . keys . envQueueKey ( env ) ) ;
144+ }
145+
142146 public async oldestMessageInQueue (
143147 env : AuthenticatedEnvironment ,
144148 queue : string ,
@@ -1074,6 +1078,7 @@ export class MarQS {
10741078 concurrencyKey ,
10751079 envConcurrencyKey ,
10761080 orgConcurrencyKey ,
1081+ this . keys . envQueueKeyFromQueue ( message . queue ) ,
10771082 message . queue ,
10781083 message . messageId ,
10791084 JSON . stringify ( message ) ,
@@ -1111,6 +1116,7 @@ export class MarQS {
11111116 currentConcurrencyKey ,
11121117 envCurrentConcurrencyKey ,
11131118 orgCurrentConcurrencyKey ,
1119+ this . keys . envQueueKeyFromQueue ( messageQueue ) ,
11141120 messageQueue ,
11151121 String ( Date . now ( ) ) ,
11161122 String ( this . options . defaultEnvConcurrency ) ,
@@ -1187,6 +1193,7 @@ export class MarQS {
11871193 concurrencyKey ,
11881194 envConcurrencyKey ,
11891195 orgConcurrencyKey ,
1196+ this . keys . envQueueKeyFromQueue ( messageQueue ) ,
11901197 messageId ,
11911198 messageQueue
11921199 ) ;
@@ -1234,6 +1241,7 @@ export class MarQS {
12341241 envConcurrencyKey ,
12351242 orgConcurrencyKey ,
12361243 visibilityQueue ,
1244+ this . keys . envQueueKeyFromQueue ( messageQueue ) ,
12371245 messageQueue ,
12381246 messageId ,
12391247 String ( Date . now ( ) ) ,
@@ -1347,14 +1355,15 @@ export class MarQS {
13471355
13481356 #registerCommands( ) {
13491357 this . redis . defineCommand ( "enqueueMessage" , {
1350- numberOfKeys : 6 ,
1358+ numberOfKeys : 7 ,
13511359 lua : `
13521360local queue = KEYS[1]
13531361local parentQueue = KEYS[2]
13541362local messageKey = KEYS[3]
13551363local concurrencyKey = KEYS[4]
13561364local envCurrentConcurrencyKey = KEYS[5]
13571365local orgCurrentConcurrencyKey = KEYS[6]
1366+ local envQueue = KEYS[7]
13581367
13591368local queueName = ARGV[1]
13601369local messageId = ARGV[2]
@@ -1367,6 +1376,9 @@ redis.call('SET', messageKey, messageData)
13671376-- Add the message to the queue
13681377redis.call('ZADD', queue, messageScore, messageId)
13691378
1379+ -- Add the message to the env queue
1380+ redis.call('ZADD', envQueue, messageScore, messageId)
1381+
13701382-- Rebalance the parent queue
13711383local earliestMessage = redis.call('ZRANGE', queue, 0, 0, 'WITHSCORES')
13721384if #earliestMessage == 0 then
@@ -1383,7 +1395,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
13831395 } ) ;
13841396
13851397 this . redis . defineCommand ( "dequeueMessage" , {
1386- numberOfKeys : 8 ,
1398+ numberOfKeys : 9 ,
13871399 lua : `
13881400-- Keys: childQueue, parentQueue, concurrencyLimitKey, envConcurrencyLimitKey, orgConcurrencyLimitKey, currentConcurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
13891401local childQueue = KEYS[1]
@@ -1394,6 +1406,7 @@ local orgConcurrencyLimitKey = KEYS[5]
13941406local currentConcurrencyKey = KEYS[6]
13951407local envCurrentConcurrencyKey = KEYS[7]
13961408local orgCurrentConcurrencyKey = KEYS[8]
1409+ local envQueueKey = KEYS[9]
13971410
13981411-- Args: childQueueName, currentTime, defaultEnvConcurrencyLimit, defaultOrgConcurrencyLimit
13991412local childQueueName = ARGV[1]
@@ -1438,6 +1451,7 @@ local messageScore = tonumber(messages[2])
14381451
14391452-- Move message to timeout queue and update concurrency
14401453redis.call('ZREM', childQueue, messageId)
1454+ redis.call('ZREM', envQueueKey, messageId)
14411455redis.call('SADD', currentConcurrencyKey, messageId)
14421456redis.call('SADD', envCurrentConcurrencyKey, messageId)
14431457redis.call('SADD', orgCurrentConcurrencyKey, messageId)
@@ -1474,7 +1488,7 @@ redis.call('SET', messageKey, messageData, 'GET')
14741488 } ) ;
14751489
14761490 this . redis . defineCommand ( "acknowledgeMessage" , {
1477- numberOfKeys : 7 ,
1491+ numberOfKeys : 8 ,
14781492 lua : `
14791493-- Keys: parentQueue, messageKey, messageQueue, visibilityQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
14801494local parentQueue = KEYS[1]
@@ -1484,6 +1498,7 @@ local visibilityQueue = KEYS[4]
14841498local concurrencyKey = KEYS[5]
14851499local envCurrentConcurrencyKey = KEYS[6]
14861500local orgCurrentConcurrencyKey = KEYS[7]
1501+ local envQueueKey = KEYS[8]
14871502
14881503-- Args: messageId, messageQueueName
14891504local messageId = ARGV[1]
@@ -1495,6 +1510,9 @@ redis.call('DEL', messageKey)
14951510-- Remove the message from the queue
14961511redis.call('ZREM', messageQueue, messageId)
14971512
1513+ -- Remove the message from the env queue
1514+ redis.call('ZREM', envQueueKey, messageId)
1515+
14981516-- Rebalance the parent queue
14991517local earliestMessage = redis.call('ZRANGE', messageQueue, 0, 0, 'WITHSCORES')
15001518if #earliestMessage == 0 then
@@ -1514,7 +1532,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
15141532 } ) ;
15151533
15161534 this . redis . defineCommand ( "nackMessage" , {
1517- numberOfKeys : 7 ,
1535+ numberOfKeys : 8 ,
15181536 lua : `
15191537-- Keys: childQueueKey, parentQueueKey, visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, messageId
15201538local messageKey = KEYS[1]
@@ -1524,6 +1542,7 @@ local concurrencyKey = KEYS[4]
15241542local envConcurrencyKey = KEYS[5]
15251543local orgConcurrencyKey = KEYS[6]
15261544local visibilityQueue = KEYS[7]
1545+ local envQueueKey = KEYS[8]
15271546
15281547-- Args: childQueueName, messageId, currentTime, messageScore
15291548local childQueueName = ARGV[1]
@@ -1547,6 +1566,9 @@ end
15471566-- Enqueue the message into the queue
15481567redis.call('ZADD', childQueueKey, messageScore, messageId)
15491568
1569+ -- Enqueue the message into the env queue
1570+ redis.call('ZADD', envQueueKey, messageScore, messageId)
1571+
15501572-- Rebalance the parent queue
15511573local earliestMessage = redis.call('ZRANGE', childQueueKey, 0, 0, 'WITHSCORES')
15521574if #earliestMessage == 0 then
@@ -1729,6 +1751,7 @@ declare module "ioredis" {
17291751 concurrencyKey : string ,
17301752 envConcurrencyKey : string ,
17311753 orgConcurrencyKey : string ,
1754+ envQueue : string ,
17321755 queueName : string ,
17331756 messageId : string ,
17341757 messageData : string ,
@@ -1745,6 +1768,7 @@ declare module "ioredis" {
17451768 currentConcurrencyKey : string ,
17461769 envCurrentConcurrencyKey : string ,
17471770 orgCurrentConcurrencyKey : string ,
1771+ envQueueKey : string ,
17481772 childQueueName : string ,
17491773 currentTime : string ,
17501774 defaultEnvConcurrencyLimit : string ,
@@ -1766,6 +1790,7 @@ declare module "ioredis" {
17661790 concurrencyKey : string ,
17671791 envConcurrencyKey : string ,
17681792 orgConcurrencyKey : string ,
1793+ envQueueKey : string ,
17691794 messageId : string ,
17701795 messageQueueName : string ,
17711796 callback ?: Callback < void >
@@ -1779,6 +1804,7 @@ declare module "ioredis" {
17791804 envConcurrencyKey : string ,
17801805 orgConcurrencyKey : string ,
17811806 visibilityQueue : string ,
1807+ envQueueKey : string ,
17821808 childQueueName : string ,
17831809 messageId : string ,
17841810 currentTime : string ,
0 commit comments