@@ -307,7 +307,7 @@ Resultado:
307307
308308# ## Ventajas
309309
310- - Detección temprana de errores de enrutamiento : Evita que mensajes críticos “desaparezcan” sin rastro lo que facilita
310+ - Detección temprana de errores de enrutamiento : Evita que mensajes críticos “desaparezcan” sin rastro lo que facilita
311311 la identificación de configuraciones erróneas en bindings o patrones.
312312- Integridad y fiabilidad : Garantiza que cada mensaje encuentre un consumidor o, en su defecto, regrese al productor
313313 para un manejo alternativo (colas DLQ, logs, base de datos).
@@ -320,8 +320,8 @@ Aunque esta propiedad no evita problemas de rendimiento o degradación del clús
320320pérdida de mensajes no enrutados y para detectar errores de configuración en el enrutamiento.
321321
322322Cuando mandatory está activo, en condiciones normales (todas las rutas existen) no hay prácticamente impacto. En
323- situaciones anómalas, habrá un tráfico adicional de retorno por cada mensaje no enrutable. Esto supone carga extra
324- tanto para RabbitMQ (que debe enviar de vuelta el mensaje al productor) como para la aplicación emisora (que debe
323+ situaciones anómalas, habrá un tráfico adicional de retorno por cada mensaje no enrutable. Esto supone carga extra
324+ tanto para RabbitMQ (que debe enviar de vuelta el mensaje al productor) como para la aplicación emisora (que debe
325325procesar el mensaje devuelto).
326326
327327# ## Implementación
@@ -347,6 +347,7 @@ package sample;
347347import co.com.mypackage.usecase.MyUseCase;
348348import lombok.RequiredArgsConstructor;
349349import lombok.extern.java.Log;
350+ import org.reactivecommons.async.rabbit.communications.MyOutboundMessage;
350351import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
351352import org.springframework.stereotype.Component;
352353import reactor.core.publisher.Mono;
@@ -362,7 +363,7 @@ public class ResendUnroutableMessageHandler implements UnroutableMessageHandler
362363 private final MyUseCase useCase;
363364
364365 @Override
365- public Mono<Void> processMessage(OutboundMessageResult result) {
366+ public Mono<Void> processMessage(OutboundMessageResult<MyOutboundMessage> result) {
366367 var returned = result.getOutboundMessage();
367368 log.severe("Unroutable message: exchange=" + returned.getExchange()
368369 + ", routingKey=" + returned.getRoutingKey()
@@ -376,26 +377,89 @@ public class ResendUnroutableMessageHandler implements UnroutableMessageHandler
376377}
377378` ` `
378379
379- # ### Reenviar mensajes no enrutados a una cola
380+ # ### Enviar mensajes no enrutados a una cola
380381
381- Si queremos volver a enviar el mensaje a una cola es importante tener en cuenta que si la cola no existe el mensaje se
382- perderá, por lo que antes de enviar el mensaje debemos asegurarnos de que la cola esté creada.
382+ Para enviar el mensaje no enrutado a una cola, utilizamos las anotaciones `@EnableDomainEventBus` para
383+ [eventos de dominio](/reactive-commons-java/docs/reactive-commons/sending-a-domain-event), y `@EnableDirectAsyncGateway`
384+ para [comandos](/reactive-commons-java/docs/reactive-commons/sending-a-command) y
385+ [consultas asíncronas](/reactive-commons-java/docs/reactive-commons/making-an-async-query), según corresponda.
383386
384- En la clase de configuración de RabbitMQ creamos el bean `ReactiveMessageSender` para obtener la conexión al
385- broker y usamos las propiedades de conexión definidas a través del bean `AsyncRabbitPropsDomainProperties` para indicar
386- a que broker nos conectamos :
387+ Es importante asegurarse de que la cola exista antes de enviar el mensaje, ya que, de lo contrario, este se perderá.
388+ Por lo tanto, se recomienda verificar o crear la cola previamente para garantizar una entrega exitosa.
387389
388390` ` ` java
389391package sample;
390392
393+ import com.fasterxml.jackson.core.type.TypeReference;
394+ import com.fasterxml.jackson.databind.JsonNode;
391395import com.fasterxml.jackson.databind.ObjectMapper;
392- import org.reactivecommons.async.rabbit.RabbitMQSetupUtils;
393- import org.reactivecommons.async.rabbit.RabbitMQFactory;
394- import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
396+ import org.reactivecommons.api.domain.Command;
397+ import org.reactivecommons.async.api.DirectAsyncGateway;
398+ import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
399+ import org.reactivecommons.async.rabbit.communications.MyOutboundMessage;
395400import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
401+ import org.springframework.beans.factory.annotation.Value;
402+ import org.springframework.stereotype.Component;
403+ import reactor.core.publisher.Mono;
404+ import reactor.rabbitmq.OutboundMessage;
405+ import reactor.rabbitmq.OutboundMessageResult;
406+
407+ @Component
408+ @EnableDirectAsyncGateway
409+ public class ResendUnroutableMessageHandler implements UnroutableMessageHandler {
410+
411+ private final ObjectMapper objectMapper;
412+ private final String retryQueueName;
413+ private final DirectAsyncGateway gateway;
414+
415+ public ResendUnroutableMessageHandler(
416+ ObjectMapper objectMapper,
417+ DirectAsyncGateway gateway,
418+ @Value("${adapters.rabbitmq.retry-queue-name}") String retryQueueName) {
419+ this.objectMapper = objectMapper;
420+ this.retryQueueName = retryQueueName;
421+ this.gateway = gateway;
422+ }
423+
424+ public Mono<Void> emitCommand(String name, String commandId, Object data) {
425+ return Mono.from(gateway.sendCommand(
426+ // Connection with broker using the properties defined through the
427+ // AsyncRabbitPropsDomainProperties bean with the "logs" domain
428+ new Command<>(name, commandId, data), retryQueueName, "logs")
429+ );
430+ }
431+
432+ @Override
433+ public Mono<Void> processMessage(OutboundMessageResult<MyOutboundMessage> result) {
434+ OutboundMessage returned = result.getOutboundMessage();
435+ try {
436+ // The unroutable message is a command, so the message body is deserialized to the Command class.
437+ // Use the DomainEvent class for domain events and the AsyncQuery class for asynchronous queries.
438+ Command<JsonNode> command = objectMapper.readValue(returned.getBody(), new TypeReference<>() {
439+ });
440+
441+ // Send the message to the queue
442+ return emitCommand(command.getName(), command.getCommandId(), command.getData())
443+ .doOnError(e -> log.severe("Failed to send the returned message: " + e.getMessage()));
444+ } catch (Exception e) {
445+ log.severe("Error deserializing the returned message: " + e.getMessage());
446+ return Mono.empty();
447+ }
448+ }
449+ }
450+ ` ` `
451+
452+ En la clase de configuración de RabbitMQ creamos el bean `UnroutableMessageProcessor` para registrar el handler de mensajes no enrutados.
453+
454+ ` ` ` java
455+ package sample;
456+
457+ import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier;
458+ import org.reactivecommons.async.rabbit.communications.UnroutableMessageProcessor;
396459import org.reactivecommons.async.rabbit.config.RabbitProperties;
397460import org.reactivecommons.async.rabbit.config.props.AsyncProps;
398461import org.reactivecommons.async.rabbit.config.props.AsyncRabbitPropsDomainProperties;
462+ import org.springframework.beans.factory.annotation.Qualifier;
399463import org.springframework.beans.factory.annotation.Value;
400464import org.springframework.context.annotation.Bean;
401465import org.springframework.context.annotation.Configuration;
@@ -411,7 +475,7 @@ public class RabbitMQConfig {
411475 private final Integer retryDelay;
412476
413477 public RabbitMQConfig(@Qualifier("rabbit") RabbitMQConnectionProperties properties,
414- @Qualifier("rabbitDual ") RabbitMQConnectionProperties propertiesLogs,
478+ @Qualifier("rabbitLogs ") RabbitMQConnectionProperties propertiesLogs,
415479 @Value("${adapters.rabbitmq.withDLQRetry}") Boolean withDLQRetry,
416480 @Value("${adapters.rabbitmq.maxRetries}") Integer maxRetries,
417481 @Value("${adapters.rabbitmq.retryDelay}") Integer retryDelay) {
@@ -422,6 +486,7 @@ public class RabbitMQConfig {
422486 this.retryDelay = retryDelay;
423487 }
424488
489+ // This bean is used to create the RabbitMQ connection properties for the application
425490 @Bean
426491 @Primary
427492 public AsyncRabbitPropsDomainProperties customDomainProperties() {
@@ -456,64 +521,14 @@ public class RabbitMQConfig {
456521 .build();
457522 }
458523
459- // This bean is used to create the RabbitMQ connection and the message sender
524+ // This bean is used to register the handler for unroutable messages
460525 @Bean
461- public ReactiveMessageSender reactiveMessageSender(ReactiveMQFactory providerFactory,
462- AsyncRabbitPropsDomainProperties properties) {
463- return providerFactory.createMessageSenderFromProperties(properties.get("app"));
526+ UnroutableMessageProcessor registerUnroutableMessageHandler(UnroutableMessageNotifier unroutableMessageNotifier,
527+ ResendUnroutableMessageHandler handler) {
528+ var factory = new UnroutableMessageProcessor();
529+ unroutableMessageNotifier.listenToUnroutableMessages(handler);
530+ return factory;
464531 }
465532
466533}
467534` ` `
468-
469- Ahora, en el handler de mensajes no enrutados podemos enviar el mensaje a una cola específica :
470- ` ` ` java
471- package sample;
472-
473- import com.fasterxml.jackson.core.type.TypeReference;
474- import com.fasterxml.jackson.databind.JsonNode;
475- import com.fasterxml.jackson.databind.ObjectMapper;
476- import lombok.extern.java.Log;
477- import org.reactivecommons.api.domain.Command;
478- import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
479- import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
480- import org.springframework.beans.factory.annotation.Value;
481- import org.springframework.stereotype.Component;
482- import reactor.core.publisher.Mono;
483- import reactor.rabbitmq.OutboundMessage;
484- import reactor.rabbitmq.OutboundMessageResult;
485-
486- @Log
487- @Component
488- public class ResendUnroutableMessageHandler implements UnroutableMessageHandler {
489-
490- private final ReactiveMessageSender sender;
491- private final ObjectMapper objectMapper;
492- private final String retryQueueName;
493-
494- public ResendUnroutableMessageHandler(ReactiveMessageSender sender,
495- ObjectMapper objectMapper,
496- @Value("${adapters.rabbitmq.retry-queue-name}") String retryQueueName) {
497- this.sender = sender;
498- this.objectMapper = objectMapper;
499- this.retryQueueName = retryQueueName;
500- }
501-
502- @Override
503- public Mono<Void> processMessage(OutboundMessageResult result) {
504- OutboundMessage returned = result.getOutboundMessage();
505- try {
506- // The message we get is of command type, so the message body is deserialized to the Command class.
507- // For domain events the DomainEvent class should be used and for asynchronous queries the AsyncQuery class.
508- Command<JsonNode> command = objectMapper.readValue(returned.getBody(), new TypeReference<>() {
509- });
510-
511- // Send the message to the queue
512- return sender.sendMessage(command, returned.getExchange(), retryQueueName, returned.getProperties().getHeaders());
513- } catch (Exception e) {
514- log.severe("Error deserializing the returned message: " + e.getMessage());
515- return Mono.empty();
516- }
517- }
518- }
519- ` ` `
0 commit comments