@@ -371,7 +371,7 @@ private void lazyListen() {
371371 State state = this .state .get ();
372372
373373 CompletableFuture <Void > futureToAwait = state .isPrepareListening () ? containerListenFuture
374- : lazyListen (this .backOff .start ());
374+ : lazyListen (new InitialBackoffExecution ( this .backOff .start () ));
375375
376376 try {
377377 futureToAwait .get (getMaxSubscriptionRegistrationWaitingTime (), TimeUnit .MILLISECONDS );
@@ -537,8 +537,7 @@ private void awaitRegistrationTime(CompletableFuture<Void> future) {
537537 future .get (getMaxSubscriptionRegistrationWaitingTime (), TimeUnit .MILLISECONDS );
538538 } catch (InterruptedException ex ) {
539539 Thread .currentThread ().interrupt ();
540- } catch (ExecutionException | TimeoutException ignore ) {
541- }
540+ } catch (ExecutionException | TimeoutException ignore ) {}
542541 }
543542
544543 @ Override
@@ -890,8 +889,13 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
890889
891890 Runnable recoveryFunction = () -> {
892891
893- CompletableFuture <Void > lazyListen = lazyListen (backOffExecution );
894- lazyListen .whenComplete (propagate (future ));
892+ CompletableFuture <Void > lazyListen = lazyListen (new RecoveryBackoffExecution (backOffExecution ));
893+ lazyListen .whenComplete (propagate (future )).thenRun (() -> {
894+
895+ if (backOffExecution instanceof RecoveryAfterSubscriptionBackoffExecution ) {
896+ logger .info ("Subscription(s) recovered" );
897+ }
898+ });
895899 };
896900
897901 if (potentiallyRecover (loggingBackOffExecution , recoveryFunction )) {
@@ -984,7 +988,7 @@ private boolean hasTopics() {
984988 private Subscriber getRequiredSubscriber () {
985989
986990 Assert .state (this .subscriber != null ,
987- "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber" );
991+ "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber. Make sure that afterPropertiesSet() has been called " );
988992
989993 return this .subscriber ;
990994 }
@@ -1015,6 +1019,54 @@ private void logTrace(Supplier<String> message) {
10151019 }
10161020 }
10171021
1022+ BackOffExecution nextBackoffExecution (BackOffExecution backOffExecution , boolean subscribed ) {
1023+
1024+ if (subscribed ) {
1025+ return new RecoveryAfterSubscriptionBackoffExecution (backOff .start ());
1026+ }
1027+
1028+ return backOffExecution ;
1029+ }
1030+
1031+ /**
1032+ * Marker for an initial backoff.
1033+ *
1034+ * @param delegate
1035+ */
1036+ record InitialBackoffExecution (BackOffExecution delegate ) implements BackOffExecution {
1037+
1038+ @ Override
1039+ public long nextBackOff () {
1040+ return delegate .nextBackOff ();
1041+ }
1042+ }
1043+
1044+ /**
1045+ * Marker for a recovery after a subscription has been active previously.
1046+ *
1047+ * @param delegate
1048+ */
1049+ record RecoveryAfterSubscriptionBackoffExecution (BackOffExecution delegate ) implements BackOffExecution {
1050+
1051+ @ Override
1052+ public long nextBackOff () {
1053+ return delegate .nextBackOff ();
1054+ }
1055+ }
1056+
1057+ /**
1058+ * Marker for a recovery execution.
1059+ *
1060+ * @param delegate
1061+ */
1062+ record RecoveryBackoffExecution (BackOffExecution delegate ) implements BackOffExecution {
1063+
1064+ @ Override
1065+ public long nextBackOff () {
1066+ return delegate .nextBackOff ();
1067+ }
1068+ }
1069+
10181070 /**
10191071 * Represents an operation that accepts three input arguments {@link SubscriptionListener},
10201072 * {@code channel or pattern}, and {@code count} and returns no result.
@@ -1189,10 +1241,15 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
11891241 try {
11901242 eventuallyPerformSubscription (connection , backOffExecution , initFuture , patterns , channels );
11911243 } catch (Throwable t ) {
1192- handleSubscriptionException (initFuture , backOffExecution , t );
1244+ handleSubscriptionException (initFuture , nextBackoffExecution (backOffExecution , connection .isSubscribed ()),
1245+ t );
11931246 }
11941247 } catch (RuntimeException ex ) {
1195- initFuture .completeExceptionally (ex );
1248+ if (backOffExecution instanceof InitialBackoffExecution ) {
1249+ initFuture .completeExceptionally (ex );
1250+ } else {
1251+ handleSubscriptionException (initFuture , backOffExecution , ex );
1252+ }
11961253 }
11971254
11981255 return initFuture ;
@@ -1205,8 +1262,9 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
12051262 void eventuallyPerformSubscription (RedisConnection connection , BackOffExecution backOffExecution ,
12061263 CompletableFuture <Void > subscriptionDone , Collection <byte []> patterns , Collection <byte []> channels ) {
12071264
1208- addSynchronization (new SynchronizingMessageListener .SubscriptionSynchronization (patterns , channels ,
1209- () -> subscriptionDone .complete (null )));
1265+ addSynchronization (new SynchronizingMessageListener .SubscriptionSynchronization (patterns , channels , () -> {
1266+ subscriptionDone .complete (null );
1267+ }));
12101268
12111269 doSubscribe (connection , patterns , channels );
12121270 }
@@ -1412,7 +1470,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
14121470 try {
14131471 subscribeChannel (channels .toArray (new byte [0 ][]));
14141472 } catch (Exception ex ) {
1415- handleSubscriptionException (subscriptionDone , backOffExecution , ex );
1473+ handleSubscriptionException (subscriptionDone , nextBackoffExecution ( backOffExecution , true ) , ex );
14161474 }
14171475 }));
14181476 } else {
@@ -1429,7 +1487,8 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
14291487 closeConnection ();
14301488 unsubscribeFuture .complete (null );
14311489 } catch (Throwable cause ) {
1432- handleSubscriptionException (subscriptionDone , backOffExecution , cause );
1490+ handleSubscriptionException (subscriptionDone ,
1491+ nextBackoffExecution (backOffExecution , connection .isSubscribed ()), cause );
14331492 }
14341493 });
14351494 }
0 commit comments