diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifierTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifierTest.java index b370e6bf..9600529b 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifierTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifierTest.java @@ -44,9 +44,9 @@ class UnroutableMessageNotifierTest { @BeforeEach void setUp() { - // Usar el constructor por defecto y espiar el sink interno + // Use the default constructor and spy on the internal sink unroutableMessageNotifier = new UnroutableMessageNotifier(); - // Inyectar el mock del sink usando un spy para poder verificarlo + // Inject the sink mock using a spy to verify it try { java.lang.reflect.Field sinkField = UnroutableMessageNotifier.class.getDeclaredField("sink"); sinkField.setAccessible(true); diff --git a/build.gradle b/build.gradle index e504f49c..1996c4e7 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ plugins { id 'org.sonarqube' version '6.3.1.5724' id 'org.springframework.boot' version '3.5.5' apply false id 'io.github.gradle-nexus.publish-plugin' version '2.0.0' - id 'co.com.bancolombia.cleanArchitecture' version '3.25.0' + id 'co.com.bancolombia.cleanArchitecture' version '3.26.1' } repositories { diff --git a/main.gradle b/main.gradle index 79545c89..fe96d10b 100644 --- a/main.gradle +++ b/main.gradle @@ -22,7 +22,7 @@ allprojects { property 'sonar.organization', 'reactive-commons' property 'sonar.host.url', 'https://sonarcloud.io' property "sonar.sources", "src/main" - property "sonar.test", "src/test" + property "sonar.tests", "src/test" property "sonar.java.binaries", "build/classes" property "sonar.junit.reportPaths", "build/test-results/test" property "sonar.java-coveragePlugin", "jacoco" @@ -86,7 +86,7 @@ subprojects { dependencyManagement { imports { - mavenBom 'org.springframework.boot:spring-boot-dependencies:3.5.4' + mavenBom 'org.springframework.boot:spring-boot-dependencies:3.5.5' } } diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomain.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomain.java index a6717026..31dd72a7 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomain.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomain.java @@ -32,11 +32,11 @@ public GenericAsyncPropsDomain(String defaultAppName, this.asyncPropsClass = asyncPropsClass; ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); - this.computeIfAbsent(DEFAULT_DOMAIN, k -> { - T defaultApp = AsyncPropsDomainBuilder.instantiate(asyncPropsClass); - defaultApp.setConnectionProperties(mapper.convertValue(defaultProperties, propsClass)); - return defaultApp; - }); + + if (!this.containsKey(DEFAULT_DOMAIN)) { + throw new InvalidConfigurationException("Required domain '" + DEFAULT_DOMAIN + "' is not configured."); + } + super.forEach((key, value) -> { // To ensure that each domain has an appName if (value.getAppName() == null) { if (defaultAppName == null || defaultAppName.isEmpty()) { diff --git a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomainTest.java b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomainTest.java index b69b8fde..9773b926 100644 --- a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomainTest.java +++ b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomainTest.java @@ -25,6 +25,8 @@ void shouldCreateProps() { String defaultAppName = "sample"; MyBrokerConnProps defaultMyBrokerProps = new MyBrokerConnProps(); AsyncMyBrokerPropsDomainProperties configured = new AsyncMyBrokerPropsDomainProperties(); + configured.put(DEFAULT_DOMAIN, new MyBrokerAsyncProps()); + MyBrokerAsyncProps other = new MyBrokerAsyncProps(); other.setAppName(OTHER); configured.put(OTHER, other); diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java index 55e97ebc..fe61e342 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java @@ -141,7 +141,7 @@ public void listenQueries(HandlerResolver resolver) { @Override public void listenReplies() { - if (props.isListenReplies()) { + if (Boolean.TRUE.equals(props.getListenReplies())) { final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, receiver, props.getBrokerConfigProps().getReplyQueue(), diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java index 00331b9e..77218b35 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java @@ -4,7 +4,6 @@ import com.rabbitmq.client.ConnectionFactory; import lombok.AccessLevel; import lombok.NoArgsConstructor; -import lombok.SneakyThrows; import lombok.extern.java.Log; import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.commons.DLQDiscardNotifier; @@ -50,18 +49,21 @@ import java.security.cert.CertificateException; import java.time.Duration; import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; @Log @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class RabbitMQSetupUtils { - private static final String LISTENER_TYPE = "listener"; - private static final String TOPOLOGY_TYPE = "topology"; - private static final String SENDER_TYPE = "sender"; + private static final String SHARED_TYPE = "shared"; private static final String DEFAULT_PROTOCOL; public static final int START_INTERVAL = 300; public static final int MAX_BACKOFF_INTERVAL = 3000; + private static final ConcurrentMap FACTORY_CACHE = new ConcurrentHashMap<>(); + private static final ConcurrentMap> CONNECTION_CACHE = new ConcurrentHashMap<>(); + static { String protocol = "TLSv1.1"; try { @@ -78,17 +80,23 @@ public final class RabbitMQSetupUtils { DEFAULT_PROTOCOL = protocol; } - @SneakyThrows public static ConnectionFactoryProvider connectionFactoryProvider(RabbitProperties properties) { - final ConnectionFactory factory = new ConnectionFactory(); - PropertyMapper map = PropertyMapper.get(); - map.from(properties::determineHost).whenNonNull().to(factory::setHost); - map.from(properties::determinePort).to(factory::setPort); - map.from(properties::determineUsername).whenNonNull().to(factory::setUsername); - map.from(properties::determinePassword).whenNonNull().to(factory::setPassword); - map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost); - factory.useNio(); - setUpSSL(factory, properties); + final ConnectionFactory factory = FACTORY_CACHE.computeIfAbsent(properties, props -> { + try { + ConnectionFactory newFactory = new ConnectionFactory(); + PropertyMapper map = PropertyMapper.get(); + map.from(props::determineHost).whenNonNull().to(newFactory::setHost); + map.from(props::determinePort).to(newFactory::setPort); + map.from(props::determineUsername).whenNonNull().to(newFactory::setUsername); + map.from(props::determinePassword).whenNonNull().to(newFactory::setPassword); + map.from(props::determineVirtualHost).whenNonNull().to(newFactory::setVirtualHost); + newFactory.useNio(); + setUpSSL(newFactory, props); + return newFactory; + } catch (Exception e) { + throw new RuntimeException("Error creating ConnectionFactory: ", e); + } + }); return () -> factory; } @@ -107,7 +115,7 @@ public static ReactiveMessageSender createMessageSender(ConnectionFactoryProvide public static ReactiveMessageListener createMessageListener(ConnectionFactoryProvider provider, AsyncProps props) { final Mono connection = - createConnectionMono(provider.getConnectionFactory(), props.getAppName(), LISTENER_TYPE); + createConnectionMono(provider.getConnectionFactory(), props.getAppName()); final Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection)); final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection)); @@ -119,8 +127,7 @@ public static ReactiveMessageListener createMessageListener(ConnectionFactoryPro public static TopologyCreator createTopologyCreator(AsyncProps props) { ConnectionFactoryProvider provider = connectionFactoryProvider(props.getConnectionProperties()); - final Mono connection = createConnectionMono(provider.getConnectionFactory(), - props.getAppName(), TOPOLOGY_TYPE); + final Mono connection = createConnectionMono(provider.getConnectionFactory(), props.getAppName()); final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection)); return new TopologyCreator(sender, props.getQueueType()); } @@ -134,8 +141,7 @@ public static DiscardNotifier createDiscardNotifier(ReactiveMessageSender sender private static SenderOptions reactiveCommonsSenderOptions(String appName, ConnectionFactoryProvider provider, RabbitProperties rabbitProperties) { - final Mono senderConnection = createConnectionMono(provider.getConnectionFactory(), appName, - SENDER_TYPE); + final Mono senderConnection = createConnectionMono(provider.getConnectionFactory(), appName); final ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions(); final PropertyMapper map = PropertyMapper.get(); @@ -153,18 +159,20 @@ private static SenderOptions reactiveCommonsSenderOptions(String appName, Connec .transform(Utils::cache)); } - private static Mono createConnectionMono(ConnectionFactory factory, String connectionPrefix, - String connectionType) { - log.info("Creating connection mono to RabbitMQ Broker in host '" + factory.getHost() + "' with " + - "type: " + connectionType); - return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType)) - .doOnError(err -> - log.log(Level.SEVERE, "Error creating connection to RabbitMQ Broker in host '" + - factory.getHost() + "'. Starting retry process...", err) - ) - .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(START_INTERVAL)) - .maxBackoff(Duration.ofMillis(MAX_BACKOFF_INTERVAL))) - .cache(); + private static Mono createConnectionMono(ConnectionFactory factory, String appName) { + return CONNECTION_CACHE.computeIfAbsent(factory, f -> { + log.info("Creating connection mono to RabbitMQ Broker in host '" + f.getHost() + "'"); + return Mono.fromCallable(() -> f.newConnection( + appName + "-" + InstanceIdentifier.getInstanceId(SHARED_TYPE, "") + )) + .doOnError(err -> + log.log(Level.SEVERE, "Error creating connection to RabbitMQ Broker in host '" + + f.getHost() + "'. Starting retry process...", err) + ) + .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(START_INTERVAL)) + .maxBackoff(Duration.ofMillis(MAX_BACKOFF_INTERVAL))) + .cache(); + }); } // SSL based on RabbitConnectionFactoryBean diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java index a0be698f..29b3eea3 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java @@ -54,7 +54,14 @@ public class AsyncProps extends GenericAsyncProps { private Integer retryDelay = 1000; @Builder.Default - private boolean listenReplies = true; + private Boolean listenReplies = null; + + public Boolean getListenReplies() { + if (listenReplies == null) { + throw new IllegalArgumentException("The 'listenReplies' property is required, please specify a 'true' or 'false' value."); + } + return listenReplies; + } @Builder.Default private Boolean withDLQRetry = false; diff --git a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java index 7e9ee686..09dc8e28 100644 --- a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java +++ b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java @@ -80,6 +80,7 @@ void init() { IBrokerConfigProps configProps = new BrokerConfigProps(props); props.setBrokerConfigProps(configProps); props.setAppName("test"); + props.setListenReplies(Boolean.TRUE); brokerProvider = new RabbitMQBrokerProvider(DEFAULT_DOMAIN, props, brokerConfig, diff --git a/starters/async-rabbit-starter/src/test/resources/application.properties b/starters/async-rabbit-starter/src/test/resources/application.properties deleted file mode 100644 index 3d8d7db0..00000000 --- a/starters/async-rabbit-starter/src/test/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -spring.application.name=test-app \ No newline at end of file diff --git a/starters/async-rabbit-starter/src/test/resources/application.yaml b/starters/async-rabbit-starter/src/test/resources/application.yaml new file mode 100644 index 00000000..803654f3 --- /dev/null +++ b/starters/async-rabbit-starter/src/test/resources/application.yaml @@ -0,0 +1,10 @@ +# Default App Name +spring: + application: + name: test-app + +# Async Props +app: + async: + app: # this is the name of the default domain + listenReplies: true \ No newline at end of file