Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class UnroutableMessageNotifierTest {

@BeforeEach
void setUp() {
// Usar el constructor por defecto y espiar el sink interno
// Use the default constructor and spy on the internal sink
unroutableMessageNotifier = new UnroutableMessageNotifier();
// Inyectar el mock del sink usando un spy para poder verificarlo
// Inject the sink mock using a spy to verify it
try {
java.lang.reflect.Field sinkField = UnroutableMessageNotifier.class.getDeclaredField("sink");
sinkField.setAccessible(true);
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
id 'org.sonarqube' version '6.3.1.5724'
id 'org.springframework.boot' version '3.5.5' apply false
id 'io.github.gradle-nexus.publish-plugin' version '2.0.0'
id 'co.com.bancolombia.cleanArchitecture' version '3.25.0'
id 'co.com.bancolombia.cleanArchitecture' version '3.26.1'
}

repositories {
Expand Down
4 changes: 2 additions & 2 deletions main.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ allprojects {
property 'sonar.organization', 'reactive-commons'
property 'sonar.host.url', 'https://sonarcloud.io'
property "sonar.sources", "src/main"
property "sonar.test", "src/test"
property "sonar.tests", "src/test"
property "sonar.java.binaries", "build/classes"
property "sonar.junit.reportPaths", "build/test-results/test"
property "sonar.java-coveragePlugin", "jacoco"
Expand Down Expand Up @@ -86,7 +86,7 @@ subprojects {

dependencyManagement {
imports {
mavenBom 'org.springframework.boot:spring-boot-dependencies:3.5.4'
mavenBom 'org.springframework.boot:spring-boot-dependencies:3.5.5'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public GenericAsyncPropsDomain(String defaultAppName,
this.asyncPropsClass = asyncPropsClass;
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
this.computeIfAbsent(DEFAULT_DOMAIN, k -> {
T defaultApp = AsyncPropsDomainBuilder.instantiate(asyncPropsClass);
defaultApp.setConnectionProperties(mapper.convertValue(defaultProperties, propsClass));
return defaultApp;
});

if (!this.containsKey(DEFAULT_DOMAIN)) {
throw new InvalidConfigurationException("Required domain '" + DEFAULT_DOMAIN + "' is not configured.");
}

super.forEach((key, value) -> { // To ensure that each domain has an appName
if (value.getAppName() == null) {
if (defaultAppName == null || defaultAppName.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ void shouldCreateProps() {
String defaultAppName = "sample";
MyBrokerConnProps defaultMyBrokerProps = new MyBrokerConnProps();
AsyncMyBrokerPropsDomainProperties configured = new AsyncMyBrokerPropsDomainProperties();
configured.put(DEFAULT_DOMAIN, new MyBrokerAsyncProps());

MyBrokerAsyncProps other = new MyBrokerAsyncProps();
other.setAppName(OTHER);
configured.put(OTHER, other);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void listenQueries(HandlerResolver resolver) {

@Override
public void listenReplies() {
if (props.isListenReplies()) {
if (Boolean.TRUE.equals(props.getListenReplies())) {
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router,
receiver,
props.getBrokerConfigProps().getReplyQueue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.rabbitmq.client.ConnectionFactory;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.java.Log;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.commons.DLQDiscardNotifier;
Expand Down Expand Up @@ -50,18 +49,21 @@
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;

@Log
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class RabbitMQSetupUtils {
private static final String LISTENER_TYPE = "listener";
private static final String TOPOLOGY_TYPE = "topology";
private static final String SENDER_TYPE = "sender";
private static final String SHARED_TYPE = "shared";
private static final String DEFAULT_PROTOCOL;
public static final int START_INTERVAL = 300;
public static final int MAX_BACKOFF_INTERVAL = 3000;

private static final ConcurrentMap<RabbitProperties, ConnectionFactory> FACTORY_CACHE = new ConcurrentHashMap<>();
private static final ConcurrentMap<ConnectionFactory, Mono<Connection>> CONNECTION_CACHE = new ConcurrentHashMap<>();

static {
String protocol = "TLSv1.1";
try {
Expand All @@ -78,17 +80,23 @@ public final class RabbitMQSetupUtils {
DEFAULT_PROTOCOL = protocol;
}

@SneakyThrows
public static ConnectionFactoryProvider connectionFactoryProvider(RabbitProperties properties) {
final ConnectionFactory factory = new ConnectionFactory();
PropertyMapper map = PropertyMapper.get();
map.from(properties::determineHost).whenNonNull().to(factory::setHost);
map.from(properties::determinePort).to(factory::setPort);
map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
factory.useNio();
setUpSSL(factory, properties);
final ConnectionFactory factory = FACTORY_CACHE.computeIfAbsent(properties, props -> {
try {
ConnectionFactory newFactory = new ConnectionFactory();
PropertyMapper map = PropertyMapper.get();
map.from(props::determineHost).whenNonNull().to(newFactory::setHost);
map.from(props::determinePort).to(newFactory::setPort);
map.from(props::determineUsername).whenNonNull().to(newFactory::setUsername);
map.from(props::determinePassword).whenNonNull().to(newFactory::setPassword);
map.from(props::determineVirtualHost).whenNonNull().to(newFactory::setVirtualHost);
newFactory.useNio();
setUpSSL(newFactory, props);
return newFactory;
} catch (Exception e) {
throw new RuntimeException("Error creating ConnectionFactory: ", e);
}
});
return () -> factory;
}

Expand All @@ -107,7 +115,7 @@ public static ReactiveMessageSender createMessageSender(ConnectionFactoryProvide

public static ReactiveMessageListener createMessageListener(ConnectionFactoryProvider provider, AsyncProps props) {
final Mono<Connection> connection =
createConnectionMono(provider.getConnectionFactory(), props.getAppName(), LISTENER_TYPE);
createConnectionMono(provider.getConnectionFactory(), props.getAppName());
final Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection));
final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection));

Expand All @@ -119,8 +127,7 @@ public static ReactiveMessageListener createMessageListener(ConnectionFactoryPro

public static TopologyCreator createTopologyCreator(AsyncProps props) {
ConnectionFactoryProvider provider = connectionFactoryProvider(props.getConnectionProperties());
final Mono<Connection> connection = createConnectionMono(provider.getConnectionFactory(),
props.getAppName(), TOPOLOGY_TYPE);
final Mono<Connection> connection = createConnectionMono(provider.getConnectionFactory(), props.getAppName());
final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection));
return new TopologyCreator(sender, props.getQueueType());
}
Expand All @@ -134,8 +141,7 @@ public static DiscardNotifier createDiscardNotifier(ReactiveMessageSender sender

private static SenderOptions reactiveCommonsSenderOptions(String appName, ConnectionFactoryProvider provider,
RabbitProperties rabbitProperties) {
final Mono<Connection> senderConnection = createConnectionMono(provider.getConnectionFactory(), appName,
SENDER_TYPE);
final Mono<Connection> senderConnection = createConnectionMono(provider.getConnectionFactory(), appName);
final ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions();
final PropertyMapper map = PropertyMapper.get();

Expand All @@ -153,18 +159,20 @@ private static SenderOptions reactiveCommonsSenderOptions(String appName, Connec
.transform(Utils::cache));
}

private static Mono<Connection> createConnectionMono(ConnectionFactory factory, String connectionPrefix,
String connectionType) {
log.info("Creating connection mono to RabbitMQ Broker in host '" + factory.getHost() + "' with " +
"type: " + connectionType);
return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType))
.doOnError(err ->
log.log(Level.SEVERE, "Error creating connection to RabbitMQ Broker in host '" +
factory.getHost() + "'. Starting retry process...", err)
)
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(START_INTERVAL))
.maxBackoff(Duration.ofMillis(MAX_BACKOFF_INTERVAL)))
.cache();
private static Mono<Connection> createConnectionMono(ConnectionFactory factory, String appName) {
return CONNECTION_CACHE.computeIfAbsent(factory, f -> {
log.info("Creating connection mono to RabbitMQ Broker in host '" + f.getHost() + "'");
return Mono.fromCallable(() -> f.newConnection(
appName + "-" + InstanceIdentifier.getInstanceId(SHARED_TYPE, "")
))
.doOnError(err ->
log.log(Level.SEVERE, "Error creating connection to RabbitMQ Broker in host '"
+ f.getHost() + "'. Starting retry process...", err)
)
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(START_INTERVAL))
.maxBackoff(Duration.ofMillis(MAX_BACKOFF_INTERVAL)))
.cache();
});
}

// SSL based on RabbitConnectionFactoryBean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ public class AsyncProps extends GenericAsyncProps<RabbitProperties> {
private Integer retryDelay = 1000;

@Builder.Default
private boolean listenReplies = true;
private Boolean listenReplies = null;

public Boolean getListenReplies() {
if (listenReplies == null) {
throw new IllegalArgumentException("The 'listenReplies' property is required, please specify a 'true' or 'false' value.");
}
return listenReplies;
}

@Builder.Default
private Boolean withDLQRetry = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ void init() {
IBrokerConfigProps configProps = new BrokerConfigProps(props);
props.setBrokerConfigProps(configProps);
props.setAppName("test");
props.setListenReplies(Boolean.TRUE);
brokerProvider = new RabbitMQBrokerProvider(DEFAULT_DOMAIN,
props,
brokerConfig,
Expand Down

This file was deleted.

10 changes: 10 additions & 0 deletions starters/async-rabbit-starter/src/test/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Default App Name
spring:
application:
name: test-app

# Async Props
app:
async:
app: # this is the name of the default domain
listenReplies: true