@@ -108,14 +108,19 @@ public final class RedisPubSubHandler {
108108 /// A queue of unsubscribe changes awaiting notification of completion.
109109 private var pendingUnsubscribes : PendingSubscriptionChangeQueue
110110
111+ private let eventLoop : EventLoop
112+
111113 // we need to be extra careful not to use this context before we know we've initialized
112114 private var context : ChannelHandlerContext !
113115
114- /// - Parameter queueCapacity: The initial capacity of queues used for processing subscription changes. The initial value is `3`.
116+ /// - Parameters:
117+ /// - eventLoop: The event loop the `NIO.Channel` that this handler was added to is bound to.
118+ /// - queueCapacity: The initial capacity of queues used for processing subscription changes. The initial value is `3`.
115119 ///
116- /// Unless you are subscribing and unsubscribing from a large volume of channels or patterns at a single time,
117- /// such as a single SUBSCRIBE call, you do not need to modify this value.
118- public init ( initialSubscriptionQueueCapacity queueCapacity: Int = 3 ) {
120+ /// Unless you are subscribing and unsubscribing from a large volume of channels or patterns at a single time,
121+ /// such as a single SUBSCRIBE call, you do not need to modify this value.
122+ public init ( eventLoop: EventLoop , initialSubscriptionQueueCapacity queueCapacity: Int = 3 ) {
123+ self . eventLoop = eventLoop
119124 self . subscriptions = [ : ]
120125 self . pendingSubscribes = [ : ]
121126 self . pendingUnsubscribes = [ : ]
@@ -183,8 +188,21 @@ extension RedisPubSubHandler {
183188 onSubscribe subscribeHandler: RedisSubscriptionChangeHandler ? ,
184189 onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler ?
185190 ) -> EventLoopFuture < Int > {
191+ guard self . eventLoop. inEventLoop else {
192+ return self . eventLoop. flatSubmit {
193+ return self . addSubscription (
194+ for: target,
195+ messageReceiver: receiver,
196+ onSubscribe: subscribeHandler,
197+ onUnsubscribe: unsubscribeHandler
198+ )
199+ }
200+ }
201+
186202 switch self . state {
187- case let . error( e) : return self . context. eventLoop. makeFailedFuture ( e)
203+ case . removed: return self . eventLoop. makeFailedFuture ( RedisClientError . subscriptionModeRaceCondition)
204+
205+ case let . error( e) : return self . eventLoop. makeFailedFuture ( e)
188206
189207 case . default:
190208 // go through all the target patterns/names and update the map with the new receiver if it's already registered
@@ -206,7 +224,7 @@ extension RedisPubSubHandler {
206224 // if there aren't any new actual subscriptions,
207225 // then we just short circuit and return our local count of subscriptions
208226 guard !newSubscriptionTargets. isEmpty else {
209- return self . context . eventLoop. makeSucceededFuture ( self . subscriptions. count)
227+ return self . eventLoop. makeSucceededFuture ( self . subscriptions. count)
210228 }
211229
212230 return self . sendSubscriptionChange (
@@ -221,9 +239,13 @@ extension RedisPubSubHandler {
221239 /// - Parameter target: The channel or pattern that a receiver should be removed for.
222240 /// - Returns: A `NIO.EventLoopFuture` that resolves the number of subscriptions the client has after the subscription has been removed.
223241 public func removeSubscription( for target: RedisSubscriptionTarget ) -> EventLoopFuture < Int > {
242+ guard self . eventLoop. inEventLoop else {
243+ return self . eventLoop. flatSubmit { self . removeSubscription ( for: target) }
244+ }
245+
224246 // if we're not in our default state,
225247 // this essentially is a no-op because an error triggers all receivers to be removed
226- guard case . default = self . state else { return self . context . eventLoop. makeSucceededFuture ( 0 ) }
248+ guard case . default = self . state else { return self . eventLoop. makeSucceededFuture ( 0 ) }
227249
228250 // we send the UNSUBSCRIBE message to Redis,
229251 // and in the response we handle the actual removal of the receiver closure
@@ -240,15 +262,7 @@ extension RedisPubSubHandler {
240262 subscriptionTargets targets: [ String ] ,
241263 queue pendingQueue: ReferenceWritableKeyPath < RedisPubSubHandler , PendingSubscriptionChangeQueue >
242264 ) -> EventLoopFuture < Int > {
243- guard self . context. eventLoop. inEventLoop else {
244- return self . context. eventLoop. flatSubmit {
245- return self . sendSubscriptionChange (
246- subscriptionChangeKeyword: keyword,
247- subscriptionTargets: targets,
248- queue: pendingQueue
249- )
250- }
251- }
265+ self . eventLoop. assertInEventLoop ( )
252266
253267 var command = [ RESPValue ( bulk: keyword) ]
254268 command. append ( convertingContentsOf: targets)
@@ -263,7 +277,7 @@ extension RedisPubSubHandler {
263277
264278 // create them
265279 let pendingSubscriptions : [ ( String , EventLoopPromise < Int > ) ] = targets. map {
266- return ( $0, self . context . eventLoop. makePromise ( ) )
280+ return ( $0, self . eventLoop. makePromise ( ) )
267281 }
268282 // add the subscription change handler to the appropriate queue for each individual subscription target
269283 pendingSubscriptions. forEach { self [ keyPath: pendingQueue] . updateValue ( $1, forKey: $0) }
@@ -272,13 +286,13 @@ extension RedisPubSubHandler {
272286 let subscriptionCountFuture = EventLoopFuture < Int >
273287 . whenAllComplete (
274288 pendingSubscriptions. map { $0. 1 . futureResult } ,
275- on: self . context . eventLoop
289+ on: self . eventLoop
276290 )
277291 . flatMapThrowing { ( results) -> Int in
278292 // trust the last success response as the most current count
279293 guard let latestSubscriptionCount = results
280294 . lazy
281- . reversed ( ) // reverse to save complexity, as we just need the last (first) successful value
295+ . reversed ( ) // reverse to save time- complexity, as we just need the last (first) successful value
282296 . compactMap ( { try ? $0. get ( ) } )
283297 . first
284298 // if we have no success cases, we will still have at least one response that we can
@@ -309,7 +323,8 @@ extension RedisPubSubHandler {
309323
310324extension RedisPubSubHandler : RemovableChannelHandler {
311325 public func removeHandler( context: ChannelHandlerContext , removalToken: ChannelHandlerContext . RemovalToken ) {
312- // leave immediately so we don't get any more subscription requests
326+ // update our state and leave immediately so we don't get any more subscription requests
327+ self . state = . removed
313328 context. leavePipeline ( removalToken: removalToken)
314329 // "close" all subscription handlers
315330 self . removeAllReceivers ( )
@@ -335,16 +350,16 @@ extension RedisPubSubHandler: ChannelInboundHandler {
335350 // these guards extract some of the basic details of a pubsub message
336351 guard
337352 let array = value. array,
338- ! array. isEmpty ,
353+ array. count >= 3 ,
339354 let channelOrPattern = array [ 1 ] . string,
340355 let messageKeyword = array [ 0 ] . string
341356 else {
342357 context. fireChannelRead ( data)
343358 return
344359 }
345360
346- // safe because the array is guaranteed from the guard above to have at least 1 element
347- // and it is not to be used until we match the PubSub message keyword
361+ // safe because the array is guaranteed from the guard above to have at least 3 elements
362+ // and it is NOT to be used until we match the PubSub message keyword
348363 let message = array. last!
349364
350365 // the last check is to match one of the known pubsub message keywords
@@ -438,7 +453,7 @@ extension RedisPubSubHandler {
438453 }
439454
440455 private enum State {
441- case `default`, error( Error )
456+ case `default`, removed , error( Error )
442457 }
443458}
444459
0 commit comments