Skip to content

Commit d2ec87d

Browse files
author
Daniel Bustamante Ospina
committed
Use MessageConverter in replyListener
Fixes: gh-10, gh-11
1 parent 6b91b0a commit d2ec87d

File tree

9 files changed

+48
-20
lines changed

9 files changed

+48
-20
lines changed

async/async-commons-standalone/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.reactivecommons.async.impl.RabbitDirectAsyncGateway;
44
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
55
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
6+
import org.reactivecommons.async.impl.converters.MessageConverter;
67
import org.reactivecommons.async.impl.listeners.ApplicationReplyListener;
78
import org.reactivecommons.async.impl.reply.ReactiveReplyRouter;
89

@@ -20,8 +21,8 @@ public DirectAsyncGatewayConfig(String directMessagesExchangeName, String appNam
2021
this.appName = appName;
2122
}
2223

23-
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender) throws Exception {
24-
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName);
24+
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter) throws Exception {
25+
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName, converter);
2526
}
2627

2728
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/ReplyCommandSender.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ public abstract class ReplyCommandSender {
1818
@Autowired
1919
DirectAsyncGateway asyncGateway;
2020

21-
private final ObjectMapper mapper = new ObjectMapper();
22-
2321
protected <R, C> Mono<R> sendCommand(C command, String commandId, Class<R> type) {
2422
AsyncQuery<C> asyncQuery = new AsyncQuery<C>(commandId, command);
2523
return asyncGateway.requestReply(asyncQuery, target(), type);

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.reactivecommons.async.impl.RabbitDirectAsyncGateway;
66
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
77
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
8+
import org.reactivecommons.async.impl.converters.MessageConverter;
89
import org.reactivecommons.async.impl.listeners.ApplicationReplyListener;
910
import org.reactivecommons.async.impl.reply.ReactiveReplyRouter;
1011
import org.springframework.beans.factory.annotation.Value;
@@ -14,7 +15,6 @@
1415
import org.springframework.util.Base64Utils;
1516

1617
import java.nio.ByteBuffer;
17-
import java.util.Base64;
1818
import java.util.UUID;
1919

2020
@Configuration
@@ -30,8 +30,8 @@ public class DirectAsyncGatewayConfig {
3030
private String appName;
3131

3232
@Bean
33-
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender) throws Exception {
34-
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName);
33+
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter) throws Exception {
34+
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName, converter);
3535
}
3636

3737
@Bean

async/async-commons/src/main/java/org/reactivecommons/async/impl/RabbitDirectAsyncGateway.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package org.reactivecommons.async.impl;
22

3-
import com.fasterxml.jackson.databind.ObjectMapper;
43
import org.reactivecommons.api.domain.Command;
54
import org.reactivecommons.async.api.AsyncQuery;
65
import org.reactivecommons.async.api.DirectAsyncGateway;
76
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
87
import org.reactivecommons.async.impl.config.BrokerConfig;
8+
import org.reactivecommons.async.impl.converters.MessageConverter;
99
import org.reactivecommons.async.impl.reply.ReactiveReplyRouter;
1010
import reactor.core.publisher.Mono;
1111

@@ -20,19 +20,19 @@
2020

2121
public class RabbitDirectAsyncGateway implements DirectAsyncGateway {
2222

23-
private final ObjectMapper mapper = new ObjectMapper();
24-
2523
private final BrokerConfig config;
2624
private final ReactiveReplyRouter router;
2725
private final ReactiveMessageSender sender;
2826
private final String exchange;
27+
private final MessageConverter converter;
2928

3029

31-
public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender sender, String exchange) {
30+
public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender sender, String exchange, MessageConverter converter) {
3231
this.config = config;
3332
this.router = router;
3433
this.sender = sender;
3534
this.exchange = exchange;
35+
this.converter = converter;
3636
}
3737

3838
@Override
@@ -46,7 +46,7 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
4646

4747
final Mono<R> replyHolder = router.register(correlationID)
4848
.timeout(Duration.ofSeconds(15))
49-
.flatMap(s -> fromCallable(() -> String.class.equals(type) ? type.cast(s) : mapper.readValue(s, type)));
49+
.flatMap(s -> fromCallable(() -> (R)converter.readAsyncQuery(s, type)));
5050

5151
Map<String, Object> headers = new HashMap<>();
5252
headers.put(REPLY_ID, config.getRoutingKey());

async/async-commons/src/main/java/org/reactivecommons/async/impl/converters/JacksonMessageConverter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
6060
}
6161
}
6262

63+
@Override
64+
public <T> T readValue(Message message, Class<T> valueClass) {
65+
try {
66+
return objectMapper.readValue(message.getBody(), valueClass);
67+
} catch (IOException e) {
68+
throw new MessageConversionException("Failed to convert Message content", e);
69+
}
70+
}
71+
6372
//TODO: pull definition up to interface
6473
public <T> Command<T> readCommandStructure(Message message) {
6574
try {

async/async-commons/src/main/java/org/reactivecommons/async/impl/converters/MessageConverter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public interface MessageConverter {
1313

1414
<T> Command<T> readCommand(Message message, Class<T> bodyClass);
1515

16+
<T> T readValue(Message message, Class<T> valueClass);
17+
1618
Message toMessage(Object object);
1719

1820
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationReplyListener.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.reactivecommons.async.impl.listeners;
22

33
import lombok.extern.java.Log;
4+
import org.reactivecommons.async.impl.RabbitMessage;
5+
import org.reactivecommons.async.impl.communications.Message;
46
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
57
import org.reactivecommons.async.impl.communications.TopologyCreator;
68
import org.reactivecommons.async.impl.reply.ReactiveReplyRouter;
@@ -37,7 +39,7 @@ public void startListening(String routeKey) {
3739
if (isEmpty) {
3840
router.routeEmpty(correlationID);
3941
} else {
40-
router.routeReply(correlationID, new String(delivery.getBody()));
42+
router.routeReply(correlationID, RabbitMessage.fromDelivery(delivery));
4143
}
4244
} catch (Exception e) {
4345
log.log(Level.SEVERE, "Error in reply reception", e);

async/async-commons/src/main/java/org/reactivecommons/async/impl/reply/ReactiveReplyRouter.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,38 @@
11
package org.reactivecommons.async.impl.reply;
22

3+
import org.reactivecommons.async.impl.communications.Message;
34
import reactor.core.publisher.Mono;
45
import reactor.core.publisher.UnicastProcessor;
56
import reactor.util.concurrent.Queues;
67

78
import java.util.concurrent.ConcurrentHashMap;
89

910
public class ReactiveReplyRouter {
10-
private final ConcurrentHashMap<String, UnicastProcessor<String>> processors = new ConcurrentHashMap<>();
11+
private final ConcurrentHashMap<String, UnicastProcessor<Message>> processors = new ConcurrentHashMap<>();
1112

12-
public Mono<String> register(String correlationID) {
13-
final UnicastProcessor<String> processor = UnicastProcessor.create(Queues.<String>one().get());
13+
public Mono<Message> register(String correlationID) {
14+
final UnicastProcessor<Message> processor = UnicastProcessor.create(Queues.<Message>one().get());
1415
processors.put(correlationID, processor);
1516
return processor.singleOrEmpty();
1617
}
1718

18-
public void routeReply(String correlationID, String data) {
19-
final UnicastProcessor<String> processor = processors.remove(correlationID);
19+
public void routeReply(String correlationID, Message data) {
20+
final UnicastProcessor<Message> processor = processors.remove(correlationID);
2021
if (processor != null) {
2122
processor.onNext(data);
2223
processor.onComplete();
2324
}
2425
}
2526

2627
public <E> void routeError(String correlationID, String data) {
27-
final UnicastProcessor<String> processor = processors.remove(correlationID);
28+
final UnicastProcessor<Message> processor = processors.remove(correlationID);
2829
if (processor != null) {
2930
processor.onError(new RuntimeException(data));
3031
}
3132
}
3233

3334
public void routeEmpty(String correlationID) {
34-
final UnicastProcessor<String> processor = processors.remove(correlationID);
35+
final UnicastProcessor<Message> processor = processors.remove(correlationID);
3536
if (processor != null) {
3637
processor.onComplete();
3738
}

async/async-commons/src/test/java/org/reactivecommons/async/impl/converters/JacksonMessageConverterTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,22 @@ public void shouldConvertWithUnknownProperties() throws JsonProcessingException
5353

5454
assertThat(command.getData()).extracting(SampleClass::getId, SampleClass::getName)
5555
.containsExactly("23", "one");
56+
}
5657

58+
@Test
59+
public void readValue() {
60+
Date date = new Date();
61+
final Message message = converter.toMessage(new SampleClass("35", "name1", date));
62+
final SampleClass value = converter.readValue(message, SampleClass.class);
63+
assertThat(value).extracting(SampleClass::getId, SampleClass::getName, SampleClass::getDate)
64+
.containsExactly("35", "name1", date);
65+
}
66+
67+
@Test
68+
public void readValueString() {
69+
final Message message = converter.toMessage("Hi!");
70+
final String value = converter.readValue(message, String.class);
71+
assertThat(value).isEqualTo("Hi!");
5772
}
5873

5974
@RequiredArgsConstructor

0 commit comments

Comments
 (0)