1- import { SinglyLinkedList , DoublyLinkedNode , DoublyLinkedList } from './linked-list' ;
1+ import { DoublyLinkedNode , DoublyLinkedList , EmptyAwareSinglyLinkedList } from './linked-list' ;
22import encodeCommand from '../RESP/encoder' ;
33import { Decoder , PUSH_TYPE_MAPPING , RESP_TYPES } from '../RESP/decoder' ;
44import { TypeMapping , ReplyUnion , RespVersions , RedisArgument } from '../RESP/types' ;
55import { ChannelListeners , PubSub , PubSubCommand , PubSubListener , PubSubType , PubSubTypeListeners } from './pub-sub' ;
6- import { AbortError , ErrorReply , TimeoutError } from '../errors' ;
6+ import { AbortError , ErrorReply , CommandTimeoutDuringMaintenanceError , TimeoutError } from '../errors' ;
77import { MonitorCallback } from '.' ;
8+ import { dbgMaintenance } from './enterprise-maintenance-manager' ;
89
910export interface CommandOptions < T = TypeMapping > {
1011 chainId ?: symbol ;
@@ -30,6 +31,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3031 timeout : {
3132 signal : AbortSignal ;
3233 listener : ( ) => unknown ;
34+ originalTimeout : number | undefined ;
3335 } | undefined ;
3436}
3537
@@ -50,22 +52,74 @@ const RESP2_PUSH_TYPE_MAPPING = {
5052 [ RESP_TYPES . SIMPLE_STRING ] : Buffer
5153} ;
5254
55+ // Try to handle a push notification. Return whether you
56+ // successfully consumed the notification or not. This is
57+ // important in order for the queue to be able to pass the
58+ // notification to another handler if the current one did not
59+ // succeed.
60+ type PushHandler = ( pushItems : Array < any > ) => boolean ;
61+
5362export default class RedisCommandsQueue {
5463 readonly #respVersion;
5564 readonly #maxLength;
5665 readonly #toWrite = new DoublyLinkedList < CommandToWrite > ( ) ;
57- readonly #waitingForReply = new SinglyLinkedList < CommandWaitingForReply > ( ) ;
66+ readonly #waitingForReply = new EmptyAwareSinglyLinkedList < CommandWaitingForReply > ( ) ;
5867 readonly #onShardedChannelMoved;
5968 #chainInExecution: symbol | undefined ;
6069 readonly decoder ;
6170 readonly #pubSub = new PubSub ( ) ;
6271
72+ #pushHandlers: PushHandler [ ] = [ this . #onPush. bind ( this ) ] ;
73+
74+ #maintenanceCommandTimeout: number | undefined
75+
76+ setMaintenanceCommandTimeout ( ms : number | undefined ) {
77+ // Prevent possible api misuse
78+ if ( this . #maintenanceCommandTimeout === ms ) {
79+ dbgMaintenance ( `Queue already set maintenanceCommandTimeout to ${ ms } , skipping` ) ;
80+ return ;
81+ } ;
82+
83+ dbgMaintenance ( `Setting maintenance command timeout to ${ ms } ` ) ;
84+ this . #maintenanceCommandTimeout = ms ;
85+
86+ if ( this . #maintenanceCommandTimeout === undefined ) {
87+ dbgMaintenance ( `Queue will keep maintenanceCommandTimeout for exisitng commands, just to be on the safe side. New commands will receive normal timeouts` ) ;
88+ return ;
89+ }
90+
91+ let counter = 0 ;
92+ const total = this . #toWrite. length ;
93+
94+ // Overwrite timeouts of all eligible toWrite commands
95+ for ( const node of this . #toWrite. nodes ( ) ) {
96+ const command = node . value ;
97+
98+ // Remove timeout listener if it exists
99+ RedisCommandsQueue . #removeTimeoutListener( command )
100+
101+ counter ++ ;
102+ const newTimeout = this . #maintenanceCommandTimeout;
103+
104+ // Overwrite the command's timeout
105+ const signal = AbortSignal . timeout ( newTimeout ) ;
106+ command . timeout = {
107+ signal,
108+ listener : ( ) => {
109+ this . #toWrite. remove ( node ) ;
110+ command . reject ( new CommandTimeoutDuringMaintenanceError ( newTimeout ) ) ;
111+ } ,
112+ originalTimeout : command . timeout ?. originalTimeout
113+ } ;
114+ signal . addEventListener ( 'abort' , command . timeout . listener , { once : true } ) ;
115+ } ;
116+ dbgMaintenance ( `Total of ${ counter } of ${ total } timeouts reset to ${ ms } ` ) ;
117+ }
118+
63119 get isPubSubActive ( ) {
64120 return this . #pubSub. isActive ;
65121 }
66122
67- #invalidateCallback?: ( key : RedisArgument | null ) => unknown ;
68-
69123 constructor (
70124 respVersion : RespVersions ,
71125 maxLength : number | null | undefined ,
@@ -107,6 +161,7 @@ export default class RedisCommandsQueue {
107161 }
108162 return true ;
109163 }
164+ return false
110165 }
111166
112167 #getTypeMapping( ) {
@@ -119,30 +174,27 @@ export default class RedisCommandsQueue {
119174 onErrorReply : err => this . #onErrorReply( err ) ,
120175 //TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
121176 onPush : push => {
122- if ( ! this . #onPush( push ) ) {
123- // currently only supporting "invalidate" over RESP3 push messages
124- switch ( push [ 0 ] . toString ( ) ) {
125- case "invalidate" : {
126- if ( this . #invalidateCallback) {
127- if ( push [ 1 ] !== null ) {
128- for ( const key of push [ 1 ] ) {
129- this . #invalidateCallback( key ) ;
130- }
131- } else {
132- this . #invalidateCallback( null ) ;
133- }
134- }
135- break ;
136- }
137- }
177+ for ( const pushHandler of this . #pushHandlers) {
178+ if ( pushHandler ( push ) ) return
138179 }
139180 } ,
140181 getTypeMapping : ( ) => this . #getTypeMapping( )
141182 } ) ;
142183 }
143184
144- setInvalidateCallback ( callback ?: ( key : RedisArgument | null ) => unknown ) {
145- this . #invalidateCallback = callback ;
185+ addPushHandler ( handler : PushHandler ) : void {
186+ this . #pushHandlers. push ( handler ) ;
187+ }
188+
189+ async waitForInflightCommandsToComplete ( ) : Promise < void > {
190+ // In-flight commands already completed
191+ if ( this . #waitingForReply. length === 0 ) {
192+ return
193+ } ;
194+ // Otherwise wait for in-flight commands to fire `empty` event
195+ return new Promise ( resolve => {
196+ this . #waitingForReply. events . on ( 'empty' , resolve )
197+ } ) ;
146198 }
147199
148200 addCommand < T > (
@@ -168,15 +220,20 @@ export default class RedisCommandsQueue {
168220 typeMapping : options ?. typeMapping
169221 } ;
170222
171- const timeout = options ?. timeout ;
223+ // If #maintenanceCommandTimeout was explicitly set, we should
224+ // use it instead of the timeout provided by the command
225+ const timeout = this . #maintenanceCommandTimeout ?? options ?. timeout ;
226+ const wasInMaintenance = this . #maintenanceCommandTimeout !== undefined ;
172227 if ( timeout ) {
228+
173229 const signal = AbortSignal . timeout ( timeout ) ;
174230 value . timeout = {
175231 signal,
176232 listener : ( ) => {
177233 this . #toWrite. remove ( node ) ;
178- value . reject ( new TimeoutError ( ) ) ;
179- }
234+ value . reject ( wasInMaintenance ? new CommandTimeoutDuringMaintenanceError ( timeout ) : new TimeoutError ( ) ) ;
235+ } ,
236+ originalTimeout : options ?. timeout
180237 } ;
181238 signal . addEventListener ( 'abort' , value . timeout . listener , { once : true } ) ;
182239 }
@@ -432,7 +489,7 @@ export default class RedisCommandsQueue {
432489 }
433490
434491 static #removeTimeoutListener( command : CommandToWrite ) {
435- command . timeout ! . signal . removeEventListener ( 'abort' , command . timeout ! . listener ) ;
492+ command . timeout ? .signal . removeEventListener ( 'abort' , command . timeout ! . listener ) ;
436493 }
437494
438495 static #flushToWrite( toBeSent : CommandToWrite , err : Error ) {
0 commit comments