@@ -265,7 +265,7 @@ private void lazyListen() {
265265 }
266266
267267 try {
268- futureToAwait .get (getMaxSubscriptionRegistrationWaitingTime (), TimeUnit .SECONDS );
268+ futureToAwait .get (getMaxSubscriptionRegistrationWaitingTime (), TimeUnit .MILLISECONDS );
269269 } catch (InterruptedException e ) {
270270 Thread .currentThread ().interrupt ();
271271 } catch (ExecutionException e ) {
@@ -391,15 +391,13 @@ private boolean doUnsubscribe() {
391391 return true ;
392392 }
393393
394- try {
395- listenFuture .join ();
396- } catch (Exception e ) {
397- // ignore, just await completion here.
398- }
394+ awaitRegistrationTime (listenFuture );
399395
400396 if (this .state .compareAndSet (state , State .prepareUnsubscribe ())) {
401397
402- getRequiredSubscriber ().cancel ();
398+ getRequiredSubscriber ().unsubscribeAll ();
399+
400+ awaitRegistrationTime (this .unsubscribeFuture );
403401
404402 this .state .set (State .notListening ());
405403
@@ -416,6 +414,16 @@ private boolean doUnsubscribe() {
416414 }
417415 }
418416
417+ private void awaitRegistrationTime (CompletableFuture <Void > future ) {
418+ try {
419+ future .get (getMaxSubscriptionRegistrationWaitingTime (), TimeUnit .MILLISECONDS );
420+ } catch (InterruptedException e ) {
421+ Thread .currentThread ().interrupt ();
422+ } catch (ExecutionException | TimeoutException e ) {
423+ // ignore
424+ }
425+ }
426+
419427 @ Override
420428 public boolean isRunning () {
421429 return this .started .get ();
@@ -876,7 +884,8 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
876884 long recoveryInterval = backOffExecution .nextBackOff ();
877885
878886 if (recoveryInterval != BackOffExecution .STOP ) {
879- logger .error (String .format ("Connection failure occurred: %s. Restarting subscription task after %s ms." , ex , recoveryInterval ));
887+ logger .error (String .format ("Connection failure occurred: %s. Restarting subscription task after %s ms." , ex ,
888+ recoveryInterval ), ex );
880889 }
881890
882891 return recoveryInterval ;
@@ -897,7 +906,9 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
897906 return ;
898907 }
899908
900- logger .error ("SubscriptionTask aborted with exception:" , ex );
909+ if (isRunning ()) { // log only if the container is still running to prevent close errors from logging
910+ logger .error ("SubscriptionTask aborted with exception:" , ex );
911+ }
901912 future .completeExceptionally (ex );
902913 }
903914
@@ -1216,6 +1227,26 @@ void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronizion s
12161227 this .synchronizingMessageListener .addSynchronization (synchronizer );
12171228 }
12181229
1230+ public void unsubscribeAll () {
1231+
1232+ synchronized (localMonitor ) {
1233+
1234+ RedisConnection connection = this .connection ;
1235+ if (connection == null ) {
1236+ return ;
1237+ }
1238+
1239+ doUnsubscribe (connection );
1240+ }
1241+ }
1242+
1243+ void doUnsubscribe (RedisConnection connection ) {
1244+ closeSubscription (connection );
1245+ closeConnection ();
1246+
1247+ unsubscribeFuture .complete (null );
1248+ }
1249+
12191250 /**
12201251 * Cancel all subscriptions and close the connection.
12211252 */
@@ -1228,26 +1259,34 @@ public void cancel() {
12281259 return ;
12291260 }
12301261
1231- if ( logger . isTraceEnabled ()) {
1232- logger . trace ( "Cancelling Redis subscription..." );
1233- }
1262+ doCancel ( connection );
1263+ }
1264+ }
12341265
1235- Subscription sub = connection .getSubscription ();
1266+ void doCancel (RedisConnection connection ) {
1267+ closeSubscription (connection );
1268+ closeConnection ();
1269+ }
12361270
1237- if ( sub != null ) {
1271+ void closeSubscription ( RedisConnection connection ) {
12381272
1239- if (logger .isTraceEnabled ()) {
1240- logger .trace ("Unsubscribing from all channels " );
1241- }
1273+ if (logger .isTraceEnabled ()) {
1274+ logger .trace ("Cancelling Redis subscription... " );
1275+ }
12421276
1243- try {
1244- sub .close ();
1245- } catch (Exception e ) {
1246- logger .warn ("Unable to unsubscribe from subscriptions" , e );
1247- }
1277+ Subscription sub = connection .getSubscription ();
1278+
1279+ if (sub != null ) {
1280+
1281+ if (logger .isTraceEnabled ()) {
1282+ logger .trace ("Unsubscribing from all channels" );
12481283 }
12491284
1250- closeConnection ();
1285+ try {
1286+ sub .close ();
1287+ } catch (Exception e ) {
1288+ logger .warn ("Unable to unsubscribe from subscriptions" , e );
1289+ }
12511290 }
12521291 }
12531292
@@ -1324,6 +1363,7 @@ private void doWithSubscription(byte[][] data, BiConsumer<Subscription, byte[][]
13241363 }
13251364 }
13261365 }
1366+
13271367 }
13281368
13291369 /**
@@ -1341,6 +1381,11 @@ class BlockingSubscriber extends Subscriber {
13411381 this .executor = executor ;
13421382 }
13431383
1384+ @ Override
1385+ void doUnsubscribe (RedisConnection connection ) {
1386+ closeSubscription (connection ); // connection will be closed after exiting the doSubscribe method
1387+ }
1388+
13441389 @ Override
13451390 protected void eventuallyPerformSubscription (RedisConnection connection , BackOffExecution backOffExecution ,
13461391 CompletableFuture <Void > subscriptionDone , Collection <byte []> patterns , Collection <byte []> channels ) {
@@ -1369,6 +1414,8 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
13691414
13701415 try {
13711416 doSubscribe (connection , patterns , initiallySubscribeToChannels );
1417+ closeConnection ();
1418+ unsubscribeFuture .complete (null );
13721419 } catch (Throwable t ) {
13731420 handleSubscriptionException (subscriptionDone , backOffExecution , t );
13741421 }
0 commit comments