22//
33// This source file is part of the RediStack open source project
44//
5- // Copyright (c) 2019-2020 RediStack project authors
5+ // Copyright (c) 2019-2022 RediStack project authors
66// Licensed under Apache License v2.0
77//
88// See LICENSE.txt for license information
@@ -26,7 +26,7 @@ extension TimeAmount {
2626
2727// MARK: Pipeline manipulation
2828
29- extension Channel {
29+ extension ChannelPipeline {
3030 /// Adds the baseline channel handlers needed to support sending and receiving messages in Redis Serialization Protocol (RESP) format to the pipeline.
3131 ///
3232 /// For implementation details, see `RedisMessageEncoder`, `RedisByteDecoder`, and `RedisCommandHandler`.
@@ -62,7 +62,7 @@ extension Channel {
6262 ( RedisCommandHandler ( ) , " RediStack.CommandHandler " )
6363 ]
6464 return . andAllSucceed(
65- handlers. map { self . pipeline . addHandler ( $0, name: $1) } ,
65+ handlers. map { self . addHandler ( $0, name: $1) } ,
6666 on: self . eventLoop
6767 )
6868 }
@@ -106,14 +106,38 @@ extension Channel {
106106 /// | [ Socket.read ] | | [ Socket.write ] |
107107 /// +-----------------+ +------------------+
108108 /// - Returns: A `NIO.EventLoopFuture` that resolves the instance of the PubSubHandler that was added to the pipeline.
109- public func addPubSubHandler( ) -> EventLoopFuture < RedisPubSubHandler > {
110- return self . pipeline
111- . handler ( type: RedisCommandHandler . self)
112- . flatMap {
113- let pubsubHandler = RedisPubSubHandler ( eventLoop: self . eventLoop)
114- return self . pipeline
115- . addHandler ( pubsubHandler, name: " RediStack.PubSubHandler " , position: . before( $0) )
116- . map { pubsubHandler }
109+ public func addRedisPubSubHandler( ) -> EventLoopFuture < RedisPubSubHandler > {
110+ // first try to return the handler that already exists in the pipeline
111+
112+ return self . handler ( type: RedisPubSubHandler . self)
113+ . flatMapError {
114+ // if it doesn't exist, add it to the pipeline
115+ guard
116+ let error = $0 as? ChannelPipelineError ,
117+ error == . notFound
118+ else { return self . eventLoop. makeFailedFuture ( $0) }
119+
120+ return self . handler ( type: RedisCommandHandler . self)
121+ . flatMap {
122+ let pubsubHandler = RedisPubSubHandler ( eventLoop: self . eventLoop)
123+ return self . addHandler ( pubsubHandler, name: " RediStack.PubSubHandler " , position: . before( $0) )
124+ . map { pubsubHandler }
125+ }
126+ }
127+ }
128+
129+ /// Removes the provided Redis PubSub handler.
130+ /// - Returns: A `NIO.EventLoopFuture` that resolves when the handler was removed from the pipeline.
131+ public func removeRedisPubSubHandler( _ handler: RedisPubSubHandler ) -> EventLoopFuture < Void > {
132+ self . removeHandler ( handler)
133+ . flatMapError {
134+ // if it was already removed, then we can just succeed
135+ guard
136+ let error = $0 as? ChannelPipelineError ,
137+ error == . alreadyRemoved
138+ else { return self . eventLoop. makeFailedFuture ( $0) }
139+
140+ return self . eventLoop. makeSucceededVoidFuture ( )
117141 }
118142 }
119143}
@@ -135,6 +159,6 @@ extension ClientBootstrap {
135159 ChannelOptions . socket ( SocketOptionLevel ( SOL_SOCKET) , SO_REUSEADDR) ,
136160 value: 1
137161 )
138- . channelInitializer { $0. addBaseRedisHandlers ( ) }
162+ . channelInitializer { $0. pipeline . addBaseRedisHandlers ( ) }
139163 }
140164}
0 commit comments