|
44 | 44 | import org.springframework.context.SmartLifecycle; |
45 | 45 | import org.springframework.core.task.SimpleAsyncTaskExecutor; |
46 | 46 | import org.springframework.core.task.TaskExecutor; |
| 47 | +import org.springframework.dao.DataAccessException; |
47 | 48 | import org.springframework.data.redis.RedisConnectionFailureException; |
48 | 49 | import org.springframework.data.redis.connection.ConnectionUtils; |
49 | 50 | import org.springframework.data.redis.connection.Message; |
|
53 | 54 | import org.springframework.data.redis.connection.Subscription; |
54 | 55 | import org.springframework.data.redis.connection.SubscriptionListener; |
55 | 56 | import org.springframework.data.redis.connection.util.ByteArrayWrapper; |
| 57 | +import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException; |
56 | 58 | import org.springframework.data.redis.serializer.RedisSerializer; |
57 | 59 | import org.springframework.data.redis.serializer.StringRedisSerializer; |
58 | 60 | import org.springframework.lang.Nullable; |
@@ -181,7 +183,6 @@ public void afterPropertiesSet() { |
181 | 183 | subscriptionExecutor = taskExecutor; |
182 | 184 | } |
183 | 185 |
|
184 | | - |
185 | 186 | this.subscriber = createSubscriber(connectionFactory, this.subscriptionExecutor); |
186 | 187 |
|
187 | 188 | afterPropertiesSet = true; |
@@ -269,6 +270,11 @@ private void lazyListen() { |
269 | 270 | } catch (InterruptedException e) { |
270 | 271 | Thread.currentThread().interrupt(); |
271 | 272 | } catch (ExecutionException e) { |
| 273 | + |
| 274 | + if (e.getCause() instanceof DataAccessException) { |
| 275 | + throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause()); |
| 276 | + } |
| 277 | + |
272 | 278 | throw new CompletionException(e.getCause()); |
273 | 279 | } catch (TimeoutException e) { |
274 | 280 | throw new IllegalStateException("Subscription registration timeout exceeded.", e); |
@@ -670,7 +676,16 @@ else if (topic instanceof PatternTopic) { |
670 | 676 | getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][])); |
671 | 677 | getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][])); |
672 | 678 |
|
673 | | - future.join(); |
| 679 | + try { |
| 680 | + future.join(); |
| 681 | + } catch (CompletionException e) { |
| 682 | + |
| 683 | + if (e.getCause() instanceof DataAccessException) { |
| 684 | + throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause()); |
| 685 | + } |
| 686 | + |
| 687 | + throw e; |
| 688 | + } |
674 | 689 | } |
675 | 690 | } |
676 | 691 | } |
@@ -1166,23 +1181,25 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col |
1166 | 1181 |
|
1167 | 1182 | synchronized (localMonitor) { |
1168 | 1183 |
|
1169 | | - RedisConnection connection = connectionFactory.getConnection(); |
1170 | | - this.connection = connection; |
1171 | | - |
1172 | | - if (connection.isSubscribed()) { |
| 1184 | + CompletableFuture<Void> initFuture = new CompletableFuture<>(); |
| 1185 | + try { |
| 1186 | + RedisConnection connection = connectionFactory.getConnection(); |
| 1187 | + this.connection = connection; |
1173 | 1188 |
|
1174 | | - CompletableFuture<Void> failure = new CompletableFuture<>(); |
1175 | | - failure.completeExceptionally( |
1176 | | - new IllegalStateException("Retrieved connection is already subscribed; aborting listening")); |
1177 | | - return failure; |
1178 | | - } |
| 1189 | + if (connection.isSubscribed()) { |
1179 | 1190 |
|
1180 | | - CompletableFuture<Void> initFuture = new CompletableFuture<>(); |
| 1191 | + initFuture.completeExceptionally( |
| 1192 | + new IllegalStateException("Retrieved connection is already subscribed; aborting listening")); |
| 1193 | + return initFuture; |
| 1194 | + } |
1181 | 1195 |
|
1182 | | - try { |
1183 | | - eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels); |
1184 | | - } catch (Throwable t) { |
1185 | | - handleSubscriptionException(initFuture, backOffExecution, t); |
| 1196 | + try { |
| 1197 | + eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels); |
| 1198 | + } catch (Throwable t) { |
| 1199 | + handleSubscriptionException(initFuture, backOffExecution, t); |
| 1200 | + } |
| 1201 | + } catch (RuntimeException e) { |
| 1202 | + initFuture.completeExceptionally(e); |
1186 | 1203 | } |
1187 | 1204 |
|
1188 | 1205 | return initFuture; |
|
0 commit comments