|
19 | 19 | import org.springframework.context.annotation.Configuration; |
20 | 20 | import org.springframework.context.annotation.Import; |
21 | 21 | import reactor.core.publisher.Mono; |
22 | | -import reactor.core.scheduler.Scheduler; |
23 | | -import reactor.core.scheduler.Schedulers; |
24 | 22 | import reactor.rabbitmq.*; |
25 | 23 |
|
26 | 24 | import java.time.Duration; |
@@ -59,7 +57,9 @@ public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, M |
59 | 57 | channelPoolOptions |
60 | 58 | ); |
61 | 59 |
|
62 | | - final Sender sender = RabbitFlux.createSender(new SenderOptions().channelPool(channelPool)); |
| 60 | + final Sender sender = RabbitFlux.createSender(new SenderOptions() |
| 61 | + .channelPool(channelPool) |
| 62 | + .resourceManagementChannelMono(channelPool.getChannelMono())); |
63 | 63 |
|
64 | 64 | return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(sender)); |
65 | 65 | } |
@@ -101,13 +101,11 @@ public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSuppli |
101 | 101 | } |
102 | 102 |
|
103 | 103 | Mono<Connection> createConnectionMono(ConnectionFactory factory, String connectionPrefix, String connectionType) { |
104 | | - final Scheduler senderScheduler = Schedulers.elastic(); |
105 | 104 | return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType)) |
106 | 105 | .doOnError(err -> |
107 | 106 | log.log(Level.SEVERE, "Error creating connection to RabbitMq Broker. Starting retry process...", err) |
108 | 107 | ) |
109 | 108 | .retryBackoff(Long.MAX_VALUE, Duration.ofMillis(300), Duration.ofMillis(3000)) |
110 | | - .subscribeOn(senderScheduler) |
111 | 109 | .cache(); |
112 | 110 | } |
113 | 111 |
|
|
0 commit comments