Skip to content

Commit c96ac09

Browse files
dbuosDaniel Bustamante Ospina
authored andcommitted
Tests and code fixes to enable CustomErrorReporter
1 parent de4303f commit c96ac09

26 files changed

+616
-109
lines changed

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/CommandListenersConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
77
import org.reactivecommons.async.impl.config.props.AsyncProps;
88
import org.reactivecommons.async.impl.converters.MessageConverter;
9+
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
910
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1011
import org.springframework.beans.factory.annotation.Value;
1112
import org.springframework.context.annotation.Bean;
@@ -25,10 +26,11 @@ public class CommandListenersConfig {
2526
@Bean
2627
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
2728
HandlerResolver resolver, MessageConverter converter,
28-
DiscardNotifier discardNotifier) {
29+
DiscardNotifier discardNotifier,
30+
CustomErrorReporter errorReporter) {
2931
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
3032
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
31-
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier);
33+
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier, errorReporter);
3234

3335
commandListener.startListener();
3436

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/EventListenersConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
77
import org.reactivecommons.async.impl.config.props.AsyncProps;
88
import org.reactivecommons.async.impl.converters.MessageConverter;
9+
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
910
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
1011
import org.springframework.beans.factory.annotation.Value;
1112
import org.springframework.context.annotation.Bean;
@@ -24,12 +25,12 @@ public class EventListenersConfig {
2425

2526
@Bean
2627
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter,
27-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
28+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomErrorReporter errorReporter) {
2829

2930
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
3031
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
3132
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),asyncProps.getDomain().getEvents().getMaxLengthBytes(),
32-
discardNotifier);
33+
discardNotifier, errorReporter);
3334

3435
listener.startListener();
3536

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
1616
import org.reactivecommons.async.impl.config.props.AsyncProps;
1717
import org.reactivecommons.async.impl.converters.MessageConverter;
18+
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
1819
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1920
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
2021
import org.reactivecommons.async.impl.listeners.ApplicationNotificationListener;
@@ -45,48 +46,51 @@ public class MessageListenersConfig {
4546

4647
@Bean //TODO: move to own config (QueryListenerConfig)
4748
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter,
48-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
49+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomErrorReporter errorReporter) {
4950
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
5051
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
5152
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),
52-
asyncProps.getDomain().getEvents().getMaxLengthBytes(), discardNotifier);
53+
asyncProps.getDomain().getEvents().getMaxLengthBytes(), discardNotifier, errorReporter);
5354
listener.startListener();
5455
return listener;
5556
}
5657

5758
@Bean
5859
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
59-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
60+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomErrorReporter errorReporter) {
6061
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
6162
receiver,
6263
asyncProps.getDomain().getEvents().getExchange(),
6364
asyncProps.getNotificationProps().getQueueName(appName),
6465
resolver,
6566
messageConverter,
66-
discardNotifier);
67+
discardNotifier,
68+
errorReporter);
6769
listener.startListener();
6870
return listener;
6971
}
7072

7173
@Bean //TODO: move to own config (QueryListenerConfig)
7274
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
7375
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
74-
DiscardNotifier discardNotifier) {
76+
DiscardNotifier discardNotifier,
77+
CustomErrorReporter errorReporter) {
7578
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
7679
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
7780
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
78-
asyncProps.getRetryDelay(), asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier);
81+
asyncProps.getRetryDelay(), asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier, errorReporter);
7982
listener.startListener();
8083
return listener;
8184
}
8285

8386
@Bean
8487
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
8588
HandlerResolver resolver, MessageConverter converter,
86-
DiscardNotifier discardNotifier) {
89+
DiscardNotifier discardNotifier,
90+
CustomErrorReporter errorReporter) {
8791
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
8892
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
89-
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier);
93+
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier, errorReporter);
9094
commandListener.startListener();
9195
return commandListener;
9296
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/NotificacionListenersConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
77
import org.reactivecommons.async.impl.config.props.AsyncProps;
88
import org.reactivecommons.async.impl.converters.MessageConverter;
9-
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
9+
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
1010
import org.reactivecommons.async.impl.listeners.ApplicationNotificationListener;
1111
import org.springframework.beans.factory.annotation.Value;
1212
import org.springframework.context.annotation.Bean;
@@ -25,14 +25,15 @@ public class NotificacionListenersConfig {
2525

2626
@Bean
2727
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
28-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
28+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomErrorReporter errorReporter) {
2929
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
3030
receiver,
3131
asyncProps.getDomain().getEvents().getExchange(),
3232
asyncProps.getNotificationProps().getQueueName(appName),
3333
resolver,
3434
messageConverter,
35-
discardNotifier);
35+
discardNotifier,
36+
errorReporter);
3637
listener.startListener();
3738
return listener;
3839
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/QueryListenerConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
88
import org.reactivecommons.async.impl.config.props.AsyncProps;
99
import org.reactivecommons.async.impl.converters.MessageConverter;
10+
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
1011
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
1112
import org.springframework.beans.factory.annotation.Value;
1213
import org.springframework.context.annotation.Bean;
@@ -26,11 +27,12 @@ public class QueryListenerConfig {
2627
@Bean
2728
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
2829
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
29-
DiscardNotifier discardNotifier) {
30+
DiscardNotifier discardNotifier,
31+
CustomErrorReporter errorReporter) {
3032
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
3133
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
3234
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
33-
asyncProps.getRetryDelay(),asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier);
35+
asyncProps.getRetryDelay(),asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier, errorReporter);
3436

3537
listener.startListener();
3638

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
import com.rabbitmq.client.ConnectionFactory;
55
import lombok.RequiredArgsConstructor;
66
import lombok.extern.java.Log;
7+
import org.reactivecommons.api.domain.Command;
8+
import org.reactivecommons.api.domain.DomainEvent;
79
import org.reactivecommons.api.domain.DomainEventBus;
8-
import org.reactivecommons.async.api.DefaultCommandHandler;
9-
import org.reactivecommons.async.api.DefaultQueryHandler;
10-
import org.reactivecommons.async.api.DynamicRegistry;
11-
import org.reactivecommons.async.api.HandlerRegistry;
10+
import org.reactivecommons.async.api.*;
1211
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1312
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1413
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
1514
import org.reactivecommons.async.impl.*;
15+
import org.reactivecommons.async.impl.communications.Message;
1616
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1717
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
1818
import org.reactivecommons.async.impl.communications.TopologyCreator;
@@ -22,6 +22,7 @@
2222
import org.reactivecommons.async.impl.converters.json.DefaultObjectMapperSupplier;
2323
import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter;
2424
import org.reactivecommons.async.impl.converters.json.ObjectMapperSupplier;
25+
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
2526
import org.springframework.beans.factory.annotation.Value;
2627
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2728
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -138,6 +139,27 @@ public DiscardNotifier rabbitDiscardNotifier(ObjectMapperSupplier objectMapperSu
138139
return new RabbitDiscardNotifier(domainEventBus(sender, props), objectMapperSupplier.get());
139140
}
140141

142+
@Bean
143+
@ConditionalOnMissingBean
144+
public CustomErrorReporter reactiveCommonsCustomErrorReporter() {
145+
return new CustomErrorReporter() {
146+
@Override
147+
public Mono<Void> reportError(Throwable ex, Message rawMessage, Command<?> message, boolean redelivered) {
148+
return Mono.empty();
149+
}
150+
151+
@Override
152+
public Mono<Void> reportError(Throwable ex, Message rawMessage, DomainEvent<?> message, boolean redelivered) {
153+
return Mono.empty();
154+
}
155+
156+
@Override
157+
public Mono<Void> reportError(Throwable ex, Message rawMessage, AsyncQuery<?> message, boolean redelivered) {
158+
return Mono.empty();
159+
}
160+
};
161+
}
162+
141163
private DomainEventBus domainEventBus(ReactiveMessageSender sender, BrokerConfigProps props) {
142164
final String exchangeName = props.getDomainEventsExchangeName();
143165
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();

async/async-commons-starter/src/test/java/org/reactivecommons/async/impl/config/RabbitMqConfigTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@
33
import com.rabbitmq.client.Connection;
44
import com.rabbitmq.client.ConnectionFactory;
55
import org.junit.Test;
6+
import org.reactivecommons.api.domain.Command;
7+
import org.reactivecommons.api.domain.DomainEvent;
8+
import org.reactivecommons.async.api.AsyncQuery;
9+
import org.reactivecommons.async.impl.communications.Message;
10+
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
611
import reactor.test.StepVerifier;
712

813
import java.io.IOException;
914
import java.time.Duration;
1015
import java.util.concurrent.TimeoutException;
1116
import java.util.concurrent.atomic.AtomicInteger;
1217

18+
import static org.assertj.core.api.Assertions.*;
1319
import static org.mockito.Mockito.mock;
1420
import static org.mockito.Mockito.when;
1521

@@ -36,4 +42,12 @@ public void retryInitialConnection() throws IOException, TimeoutException {
3642
.thenAwait(Duration.ofMinutes(2))
3743
.expectNext(connection).verifyComplete();
3844
}
45+
46+
@Test
47+
public void shouldCreateDefaultErrorReporter() {
48+
final CustomErrorReporter errorReporter = config.reactiveCommonsCustomErrorReporter();
49+
assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(Command.class), true)).isNotNull();
50+
assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(DomainEvent.class), true)).isNotNull();
51+
assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(AsyncQuery.class), true)).isNotNull();
52+
}
3953
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/converters/MessageConverter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public interface MessageConverter {
1616
<T> T readValue(Message message, Class<T> valueClass);
1717

1818
<T> Command<T> readCommandStructure(Message message);
19+
<T> DomainEvent<T> readDomainEventStructure(Message message);
20+
<T> AsyncQuery<T> readAsyncQueryStructure(Message message);
1921

2022
Message toMessage(Object object);
2123

async/async-commons/src/main/java/org/reactivecommons/async/impl/converters/json/JacksonMessageConverter.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,25 @@ public <T> T readValue(Message message, Class<T> valueClass) {
6767
}
6868
}
6969

70-
//TODO: pull definition up to interface
70+
@Override
71+
@SuppressWarnings("unchecked")
7172
public <T> Command<T> readCommandStructure(Message message) {
72-
try {
73-
final CommandJson commandJson = objectMapper.readValue(message.getBody(), CommandJson.class);
74-
return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T)commandJson.getData());
75-
} catch (IOException e) {
76-
throw new MessageConversionException("Failed to convert Message content", e);
77-
}
73+
final CommandJson commandJson = readValue(message, CommandJson.class);
74+
return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T)commandJson.getData());
75+
}
76+
77+
@Override
78+
@SuppressWarnings("unchecked")
79+
public <T> DomainEvent<T> readDomainEventStructure(Message message) {
80+
final DomainEventJson eventJson = readValue(message, DomainEventJson.class);
81+
return new DomainEvent<>(eventJson.getName(), eventJson.getEventId(), (T)eventJson.getData());
82+
}
83+
84+
@Override
85+
@SuppressWarnings("unchecked")
86+
public <T> AsyncQuery<T> readAsyncQueryStructure(Message message) {
87+
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
88+
return new AsyncQuery<>(asyncQueryJson.getResource(), (T)asyncQueryJson.getQueryData());
7889
}
7990

8091
@Override

async/async-commons/src/main/java/org/reactivecommons/async/impl/ext/CustomErrorReporter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,21 @@ public interface CustomErrorReporter {
1212
String EVENT_CLASS = "org.reactivecommons.api.domain.DomainEvent";
1313
String QUERY_CLASS = "org.reactivecommons.async.api.AsyncQuery";
1414

15-
default Mono<Void> reportError(Throwable ex, Message rawMessage, Object message) {
15+
default Mono<Void> reportError(Throwable ex, Message rawMessage, Object message, boolean redelivered) {
1616
switch (message.getClass().getName()){
1717
case COMMAND_CLASS:
18-
return reportError(ex, rawMessage, (Command<?>) message);
18+
return reportError(ex, rawMessage, (Command<?>) message, redelivered);
1919
case EVENT_CLASS:
20-
return reportError(ex, rawMessage, (DomainEvent<?>) message);
20+
return reportError(ex, rawMessage, (DomainEvent<?>) message, redelivered);
2121
case QUERY_CLASS:
22-
return reportError(ex, rawMessage, (AsyncQuery<?>) message);
22+
return reportError(ex, rawMessage, (AsyncQuery<?>) message, redelivered);
2323
default:
2424
return Mono.empty();
2525
}
2626
}
2727

28-
Mono<Void> reportError(Throwable ex, Message rawMessage, Command<?> message);
29-
Mono<Void> reportError(Throwable ex, Message rawMessage, DomainEvent<?> message);
30-
Mono<Void> reportError(Throwable ex, Message rawMessage, AsyncQuery<?> message);
28+
Mono<Void> reportError(Throwable ex, Message rawMessage, Command<?> message, boolean redelivered);
29+
Mono<Void> reportError(Throwable ex, Message rawMessage, DomainEvent<?> message, boolean redelivered);
30+
Mono<Void> reportError(Throwable ex, Message rawMessage, AsyncQuery<?> message, boolean redelivered);
3131

3232
}

0 commit comments

Comments
 (0)