|
8 | 8 | import reactor.rabbitmq.BindingSpecification; |
9 | 9 | import reactor.rabbitmq.ExchangeSpecification; |
10 | 10 | import reactor.rabbitmq.QueueSpecification; |
| 11 | +import reactor.rabbitmq.Sender; |
11 | 12 |
|
12 | 13 | import java.io.IOException; |
13 | 14 | import java.time.Duration; |
|
19 | 20 | */ |
20 | 21 | public class TopologyCreator { |
21 | 22 |
|
22 | | - private final Mono<Channel> channel; |
| 23 | + private final Sender sender; |
23 | 24 |
|
24 | | - public TopologyCreator(Mono<Connection> connectionMono) { |
25 | | - this.channel = connectionMono.map(connection -> { |
26 | | - try { |
27 | | - return connection.createChannel(); |
28 | | - } catch (IOException e) { |
29 | | - throw new TopologyDefException("Fail to create channel", e); |
30 | | - } |
31 | | - }).doOnError(e -> log.log(Level.SEVERE, e.getMessage(), e)) |
32 | | - .retryBackoff(5, Duration.ofMillis(500)) |
33 | | - .cache(); |
| 25 | + public TopologyCreator(Sender sender) { |
| 26 | + this.sender = sender; |
34 | 27 | } |
35 | 28 |
|
36 | | - public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange){ |
37 | | - return channel.map(ch -> { |
38 | | - try { |
39 | | - return ch.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(), exchange.isAutoDelete(), exchange.getArguments()); |
40 | | - } catch (IOException e) { |
41 | | - throw new TopologyDefException("Fail to declare exchange: " + exchange.getName(), e); |
42 | | - } |
43 | | - }); |
| 29 | + public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange) { |
| 30 | + return sender.declare(exchange) |
| 31 | + .onErrorMap(TopologyDefException::new); |
44 | 32 | } |
45 | 33 |
|
46 | | - public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queue){ |
47 | | - return channel.map(ch -> { |
48 | | - try { |
49 | | - return ch.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), queue.getArguments()); |
50 | | - } catch (IOException e) { |
51 | | - throw new TopologyDefException("Fail to declare queue: " + queue.getName(), e); |
52 | | - } |
53 | | - }); |
| 34 | + public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queue) { |
| 35 | + return sender.declare(queue) |
| 36 | + .onErrorMap(TopologyDefException::new); |
54 | 37 | } |
55 | 38 |
|
56 | | - public Mono<AMQP.Queue.BindOk> bind(BindingSpecification binding){ |
57 | | - return channel.map(ch -> { |
58 | | - try { |
59 | | - return ch.queueBind(binding.getQueue(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); |
60 | | - } catch (IOException e) { |
61 | | - throw new TopologyDefException("Fail to bind queue: " + binding.getQueue(), e); |
62 | | - } |
63 | | - }); |
| 39 | + public Mono<AMQP.Queue.BindOk> bind(BindingSpecification binding) { |
| 40 | + return sender.bind(binding) |
| 41 | + .onErrorMap(TopologyDefException::new); |
64 | 42 | } |
65 | 43 |
|
66 | 44 | public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification binding) { |
67 | | - return channel.map(ch -> { |
68 | | - try { |
69 | | - return ch.queueUnbind(binding.getQueue(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); |
70 | | - } catch (IOException e) { |
71 | | - throw new TopologyDefException("Fail to unbind queue: " + binding.getQueue(), e); |
72 | | - } |
73 | | - }) ; |
| 45 | + return sender.unbind(binding) |
| 46 | + .onErrorMap(TopologyDefException::new); |
74 | 47 | } |
75 | 48 |
|
76 | 49 | public static class TopologyDefException extends RuntimeException { |
77 | | - public TopologyDefException(String message, Throwable cause) { |
78 | | - super(message, cause); |
| 50 | + public TopologyDefException(Throwable cause) { |
| 51 | + super(cause); |
79 | 52 | } |
80 | 53 | } |
81 | 54 | } |
0 commit comments