Skip to content

Commit de22a43

Browse files
author
Daniel Bustamante Ospina
committed
Change Rabbit Message delivery Mode (2)
1 parent edae698 commit de22a43

File tree

12 files changed

+10
-162
lines changed

12 files changed

+10
-162
lines changed

async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/ReactiveMessageSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private AMQP.BasicProperties buildMessageProperties(Message message, Map<String,
5151
.contentType(properties.getContentType())
5252
.appId(sourceApplication)
5353
.contentEncoding(properties.getContentEncoding())
54-
.deliveryMode(1)
54+
.deliveryMode(2)
5555
.timestamp(new Date())
5656
.messageId(UUID.randomUUID().toString())
5757
.headers(baseHeaders).build();

async/async-commons/src/main/java/org/reactivecommons/async/impl/converters/JacksonMessageConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public <T> T readValue(Message message, Class<T> valueClass) {
7373
public <T> Command<T> readCommandStructure(Message message) {
7474
try {
7575
final CommandJson commandJson = objectMapper.readValue(message.getBody(), CommandJson.class);
76-
return new Command<>(commandJson.getName(), commandJson.getCommandId(), null);
76+
return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T)commandJson.getData());
7777
} catch (IOException e) {
7878
throw new MessageConversionException("Failed to convert Message content", e);
7979
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/converters/MessageConverter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public interface MessageConverter {
1515

1616
<T> T readValue(Message message, Class<T> valueClass);
1717

18+
<T> Command<T> readCommandStructure(Message message);
19+
1820
Message toMessage(Object object);
1921

2022
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationCommandListener.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath)
5151
return msj -> executor.execute(msj).cast(Object.class);
5252
}
5353

54-
//TODO: replace with interface
5554
protected String getExecutorPath(AcknowledgableDelivery msj) {
56-
final Command<Object> command = ((JacksonMessageConverter) messageConverter).readCommandStructure(RabbitMessage.fromDelivery(msj));
55+
final Command<Object> command = messageConverter.readCommandStructure(RabbitMessage.fromDelivery(msj));
5756
return command.getName();
5857
}
5958

domain/domain-events/src/main/java/org/reactivecommons/api/domain/Command.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivecommons.api.domain;
22

3+
34
import lombok.AllArgsConstructor;
45
import lombok.Data;
56

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
app.async.reply.prefix=sample-receiver.
21
spring.application.name=Receiver2

samples/async/sender-client/src/main/java/sample/SampleSenderApp.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sample;
22

33
import lombok.extern.java.Log;
4+
import org.reactivecommons.api.domain.Command;
45
import org.reactivecommons.async.api.AsyncQuery;
56
import org.reactivecommons.async.api.DirectAsyncGateway;
67
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
@@ -25,6 +26,8 @@ public static void main(String[] args) throws InterruptedException {
2526

2627
@Bean
2728
public CommandLineRunner run(MemberRegistrySender sender, DirectAsyncGateway asyncGateway) {
29+
Command<String> command0 = new Command<>("test", "01", "Daniel");
30+
asyncGateway.sendCommand(command0, "Receiver2").repeat(5).subscribe();
2831
return args -> Flux.interval(Duration.ofSeconds(1)).concatMap(n -> {
2932
AddMemberCommand command = new AddMemberCommand("Daniel " + n, n+"");
3033
return asyncGateway.requestReply(new AsyncQuery<>("serveQuery.empty", "test"), "Receiver2", String.class)
Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1 @@
1-
app.async.reply.prefix=sender-client.
2-
spring.application.name=Sender
3-
4-
spring.cloud.stream.bindings.input.group=sample1
5-
spring.cloud.stream.bindings.input.destination=Sender
6-
spring.cloud.stream.bindings.input.consumer.concurrency=12
1+
spring.application.name=Sender

utils/object-mapper/object-mapper.gradle

Lines changed: 0 additions & 76 deletions
This file was deleted.

utils/object-mapper/src/main/java/org/reactivecommons/utils/LombokBuilderNameTransformer.java

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)