@@ -32,16 +32,16 @@ public struct RedisCommandContext {
3232/// A `ChannelDuplexHandler` that works with `RedisCommandContext`s to send commands and forward responses.
3333open class RedisCommandHandler {
3434 /// Queue of promises waiting to receive a response value from a sent command.
35- private var commandResponseQueue : [ EventLoopPromise < RESPValue > ]
35+ private var commandResponseQueue : CircularBuffer < EventLoopPromise < RESPValue > >
3636 private var logger : Logger
3737
3838 deinit {
3939 guard commandResponseQueue. count > 0 else { return }
4040 logger. warning ( " Command handler deinit when queue is not empty. Current size: \( commandResponseQueue. count) " )
4141 }
4242
43- public init ( logger: Logger = Logger ( label: " RedisNIO.CommandHandler " ) ) {
44- self . commandResponseQueue = [ ]
43+ public init ( logger: Logger = Logger ( label: " RedisNIO.CommandHandler " ) , initialQueueCapacity : Int = 5 ) {
44+ self . commandResponseQueue = CircularBuffer ( initialCapacity : initialQueueCapacity )
4545 self . logger = logger
4646 self . logger [ metadataKey: " CommandHandler " ] = " \( UUID ( ) ) "
4747 }
@@ -58,30 +58,31 @@ extension RedisCommandHandler: ChannelInboundHandler {
5858 ///
5959 /// See `ChannelInboundHandler.errorCaught(context:error:)`
6060 public func errorCaught( context: ChannelHandlerContext , error: Error ) {
61- guard let leadPromise = commandResponseQueue. last else {
62- return assertionFailure ( " Received unexpected error while idle: \( error. localizedDescription) " )
63- }
64- leadPromise. fail ( error)
61+ let queue = self . commandResponseQueue
62+
63+ assert ( queue. count > 0 , " Received unexpected error while idle: \( error. localizedDescription) " )
64+
65+ self . commandResponseQueue. removeAll ( )
66+ queue. forEach { $0. fail ( error) }
67+
68+ logger. critical ( " Error in channel pipeline. " , metadata: [ " error " : . string( error. localizedDescription) ] )
69+
6570 context. fireErrorCaught ( error)
66- RedisMetrics . commandFailureCount. increment ( )
6771 }
6872
6973 /// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
7074 /// `RESPValue` to the promise awaiting a response at the front of the queue.
7175 ///
7276 /// See `ChannelInboundHandler.channelRead(context:data:)`
7377 public func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
74- let value = unwrapInboundIn ( data)
78+ let value = self . unwrapInboundIn ( data)
7579
76- guard let leadPromise = commandResponseQueue. last else {
80+ guard let leadPromise = self . commandResponseQueue. popFirst ( ) else {
7781 assertionFailure ( " Read triggered with an empty promise queue! Ignoring: \( value) " )
7882 logger. critical ( " Read triggered with no promise waiting in the queue! " )
7983 return
8084 }
8185
82- let popped = commandResponseQueue. popLast ( )
83- assert ( popped != nil )
84-
8586 switch value {
8687 case . error( let e) :
8788 leadPromise. fail ( e)
@@ -108,8 +109,11 @@ extension RedisCommandHandler: ChannelOutboundHandler {
108109 ///
109110 /// See `ChannelOutboundHandler.write(context:data:promise:)`
110111 public func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
111- let commandContext = unwrapOutboundIn ( data)
112- commandResponseQueue. insert ( commandContext. responsePromise, at: 0 )
113- context. write ( wrapOutboundOut ( commandContext. command) , promise: promise)
112+ let commandContext = self . unwrapOutboundIn ( data)
113+ self . commandResponseQueue. append ( commandContext. responsePromise)
114+ context. write (
115+ self . wrapOutboundOut ( commandContext. command) ,
116+ promise: promise
117+ )
114118 }
115119}
0 commit comments