1- import { SinglyLinkedList , DoublyLinkedNode , DoublyLinkedList } from './linked-list' ;
1+ import { DoublyLinkedNode , DoublyLinkedList , EmptyAwareSinglyLinkedList } from './linked-list' ;
22import encodeCommand from '../RESP/encoder' ;
33import { Decoder , PUSH_TYPE_MAPPING , RESP_TYPES } from '../RESP/decoder' ;
44import { TypeMapping , ReplyUnion , RespVersions , RedisArgument } from '../RESP/types' ;
55import { ChannelListeners , PubSub , PubSubCommand , PubSubListener , PubSubType , PubSubTypeListeners } from './pub-sub' ;
6- import { AbortError , ErrorReply , TimeoutError } from '../errors' ;
6+ import { AbortError , ErrorReply , CommandTimeoutDuringMaintananceError , TimeoutError } from '../errors' ;
77import { MonitorCallback } from '.' ;
88
99export interface CommandOptions < T = TypeMapping > {
@@ -30,6 +30,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3030 timeout : {
3131 signal : AbortSignal ;
3232 listener : ( ) => unknown ;
33+ originalTimeout : number | undefined ;
3334 } | undefined ;
3435}
3536
@@ -61,19 +62,62 @@ export default class RedisCommandsQueue {
6162 readonly #respVersion;
6263 readonly #maxLength;
6364 readonly #toWrite = new DoublyLinkedList < CommandToWrite > ( ) ;
64- readonly #waitingForReply = new SinglyLinkedList < CommandWaitingForReply > ( ) ;
65+ readonly #waitingForReply = new EmptyAwareSinglyLinkedList < CommandWaitingForReply > ( ) ;
6566 readonly #onShardedChannelMoved;
6667 #chainInExecution: symbol | undefined ;
6768 readonly decoder ;
6869 readonly #pubSub = new PubSub ( ) ;
6970
7071 #pushHandlers: PushHandler [ ] = [ this . #onPush. bind ( this ) ] ;
72+
73+ #inMaintenance = false ;
74+
75+ set inMaintenance ( value : boolean ) {
76+ this . #inMaintenance = value ;
77+ }
78+
79+ #maintenanceCommandTimeout: number | undefined
80+
81+ setMaintenanceCommandTimeout ( ms : number | undefined ) {
82+ // Prevent possible api misuse
83+ if ( this . #maintenanceCommandTimeout === ms ) return ;
84+
85+ this . #maintenanceCommandTimeout = ms ;
86+
87+ let counter = 0 ;
88+
89+ // Overwrite timeouts of all eligible toWrite commands
90+ for ( const node of this . #toWrite. nodes ( ) ) {
91+ const command = node . value ;
92+
93+ // Remove timeout listener if it exists
94+ RedisCommandsQueue . #removeTimeoutListener( command )
95+
96+ // Determine newTimeout
97+ const newTimeout = this . #maintenanceCommandTimeout ?? command . timeout ?. originalTimeout ;
98+ // if no timeout is given and the command didnt have any timeout before, skip
99+ if ( ! newTimeout ) return ;
100+
101+ counter ++ ;
102+
103+ // Overwrite the command's timeout
104+ const signal = AbortSignal . timeout ( newTimeout ) ;
105+ command . timeout = {
106+ signal,
107+ listener : ( ) => {
108+ this . #toWrite. remove ( node ) ;
109+ command . reject ( this . #inMaintenance ? new CommandTimeoutDuringMaintananceError ( newTimeout ) : new TimeoutError ( ) ) ;
110+ } ,
111+ originalTimeout : command . timeout ?. originalTimeout
112+ } ;
113+ signal . addEventListener ( 'abort' , command . timeout . listener , { once : true } ) ;
114+ } ;
115+ }
116+
71117 get isPubSubActive ( ) {
72118 return this . #pubSub. isActive ;
73119 }
74120
75- #invalidateCallback?: ( key : RedisArgument | null ) => unknown ;
76-
77121 constructor (
78122 respVersion : RespVersions ,
79123 maxLength : number | null | undefined ,
@@ -174,15 +218,19 @@ export default class RedisCommandsQueue {
174218 typeMapping : options ?. typeMapping
175219 } ;
176220
177- const timeout = options ?. timeout ;
221+ // If #maintenanceCommandTimeout was explicitly set, we should
222+ // use it instead of the timeout provided by the command
223+ const timeout = this . #maintenanceCommandTimeout || options ?. timeout
178224 if ( timeout ) {
225+
179226 const signal = AbortSignal . timeout ( timeout ) ;
180227 value . timeout = {
181228 signal,
182229 listener : ( ) => {
183230 this . #toWrite. remove ( node ) ;
184- value . reject ( new TimeoutError ( ) ) ;
185- }
231+ value . reject ( this . #inMaintenance ? new CommandTimeoutDuringMaintananceError ( timeout ) : new TimeoutError ( ) ) ;
232+ } ,
233+ originalTimeout : options ?. timeout
186234 } ;
187235 signal . addEventListener ( 'abort' , value . timeout . listener , { once : true } ) ;
188236 }
@@ -438,7 +486,7 @@ export default class RedisCommandsQueue {
438486 }
439487
440488 static #removeTimeoutListener( command : CommandToWrite ) {
441- command . timeout ! . signal . removeEventListener ( 'abort' , command . timeout ! . listener ) ;
489+ command . timeout ? .signal . removeEventListener ( 'abort' , command . timeout ! . listener ) ;
442490 }
443491
444492 static #flushToWrite( toBeSent : CommandToWrite , err : Error ) {
0 commit comments