|
7 | 7 | import lombok.extern.java.Log; |
8 | 8 | import org.reactivecommons.api.domain.Command; |
9 | 9 | import org.reactivecommons.api.domain.DomainEvent; |
| 10 | +import org.reactivecommons.api.domain.DomainEventBus; |
10 | 11 | import org.reactivecommons.async.api.AsyncQuery; |
11 | 12 | import org.reactivecommons.async.api.DefaultCommandHandler; |
12 | 13 | import org.reactivecommons.async.api.DefaultQueryHandler; |
13 | 14 | import org.reactivecommons.async.api.DynamicRegistry; |
14 | 15 | import org.reactivecommons.async.api.HandlerRegistry; |
| 16 | +import org.reactivecommons.async.commons.DiscardNotifier; |
15 | 17 | import org.reactivecommons.async.commons.communications.Message; |
16 | 18 | import org.reactivecommons.async.commons.config.BrokerConfig; |
17 | 19 | import org.reactivecommons.async.commons.config.IBrokerConfigProps; |
|
21 | 23 | import org.reactivecommons.async.commons.ext.CustomReporter; |
22 | 24 | import org.reactivecommons.async.rabbit.DynamicRegistryImp; |
23 | 25 | import org.reactivecommons.async.rabbit.HandlerResolver; |
| 26 | +import org.reactivecommons.async.rabbit.RabbitDiscardNotifier; |
| 27 | +import org.reactivecommons.async.rabbit.RabbitDomainEventBus; |
24 | 28 | import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; |
25 | 29 | import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; |
26 | 30 | import org.reactivecommons.async.rabbit.communications.TopologyCreator; |
@@ -67,14 +71,20 @@ public class RabbitMqConfig { |
67 | 71 |
|
68 | 72 |
|
69 | 73 | @Bean |
70 | | - public ConnectionManager buildConnectionManager(AsyncPropsDomain props, MessageConverter converter) { |
| 74 | + public ConnectionManager buildConnectionManager(AsyncPropsDomain props, MessageConverter converter, |
| 75 | + BrokerConfig brokerConfig, ObjectMapperSupplier objectMapperSupplier) { |
71 | 76 | ConnectionManager connectionManager = new ConnectionManager(); |
72 | 77 | props.forEach((domain, properties) -> { |
73 | 78 | ConnectionFactoryProvider provider = createConnectionFactoryProvider(properties.getConnectionProperties()); |
74 | 79 | ReactiveMessageSender sender = createMessageSender(provider, properties, converter); |
75 | 80 | ReactiveMessageListener listener = createMessageListener(provider, properties); |
76 | 81 | connectionManager.addDomain(domain, listener, sender, provider); |
77 | 82 | }); |
| 83 | + ReactiveMessageSender appDomainSender = connectionManager.getSender(DEFAULT_DOMAIN); |
| 84 | + DomainEventBus appDomainEventBus = new RabbitDomainEventBus(appDomainSender, props.getProps(DEFAULT_DOMAIN) |
| 85 | + .getBrokerConfigProps().getDomainEventsExchangeName(), brokerConfig); |
| 86 | + DiscardNotifier notifier = new RabbitDiscardNotifier(appDomainEventBus, objectMapperSupplier.get()); |
| 87 | + connectionManager.setDiscardNotifierForAll(notifier); |
78 | 88 | return connectionManager; |
79 | 89 | } |
80 | 90 |
|
|
0 commit comments