11package org .reactivecommons .async .impl ;
22
3+ import com .fasterxml .jackson .core .JsonProcessingException ;
4+ import com .fasterxml .jackson .databind .ObjectMapper ;
35import lombok .Data ;
4- import org .assertj .core .api .Assertions ;
5- import org .junit .Before ;
66import org .junit .Test ;
7+ import org .junit .runner .RunWith ;
8+ import org .mockito .ArgumentCaptor ;
79import org .mockito .Mock ;
10+ import org .mockito .junit .MockitoJUnitRunner ;
811import org .reactivecommons .api .domain .Command ;
12+ import org .reactivecommons .async .api .AsyncQuery ;
13+ import org .reactivecommons .async .api .From ;
14+ import org .reactivecommons .async .impl .communications .Message ;
915import org .reactivecommons .async .impl .communications .ReactiveMessageSender ;
1016import org .reactivecommons .async .impl .config .BrokerConfig ;
1117import org .reactivecommons .async .impl .converters .MessageConverter ;
1420import org .reactivecommons .async .impl .reply .ReactiveReplyRouter ;
1521import org .reactivestreams .Publisher ;
1622import reactor .core .publisher .Flux ;
23+ import reactor .core .publisher .Mono ;
24+ import reactor .core .publisher .UnicastProcessor ;
1725import reactor .rabbitmq .OutboundMessage ;
1826import reactor .rabbitmq .OutboundMessageResult ;
1927import reactor .rabbitmq .Sender ;
28+ import reactor .test .StepVerifier ;
29+ import reactor .util .concurrent .Queues ;
2030
2131import java .util .List ;
32+ import java .util .Map ;
2233import java .util .UUID ;
2334import java .util .concurrent .Semaphore ;
2435import java .util .concurrent .ThreadLocalRandom ;
2536import java .util .stream .Collectors ;
2637import java .util .stream .IntStream ;
2738
28- import static org .junit .jupiter .api .Assertions .*;
39+ import static org .assertj .core .api .Assertions .assertThat ;
40+ import static org .mockito .ArgumentMatchers .*;
41+ import static org .mockito .Mockito .*;
42+ import static org .reactivecommons .async .impl .Headers .*;
2943
44+
45+ @ RunWith (MockitoJUnitRunner .class )
3046public class RabbitDirectAsyncGatewayTest {
3147
3248 private final BrokerConfig config = new BrokerConfig ();
33-
49+ private final Semaphore semaphore = new Semaphore (0 );
50+ private final MessageConverter converter = new JacksonMessageConverter (new DefaultObjectMapperSupplier ().get ());
3451 @ Mock
3552 private ReactiveReplyRouter router ;
36-
3753 @ Mock
38- private MessageConverter converter ;
39-
54+ private ReactiveMessageSender senderMock ;
4055 private RabbitDirectAsyncGateway asyncGateway ;
41- private final Semaphore semaphore = new Semaphore (0 );
4256
43- @ Before
44- public void init () {
45- ReactiveMessageSender sender = getReactiveMessageSender ();
57+ public void init (ReactiveMessageSender sender ) {
4658 asyncGateway = new RabbitDirectAsyncGateway (config , router , sender , "exchange" , converter );
4759 }
4860
4961 @ Test
5062 public void shouldSendInOptimalTime () throws InterruptedException {
63+ init (getReactiveMessageSender ());
64+
5165 int messageCount = 40000 ;
5266 final Flux <Command <DummyMessage >> messages = createMessagesHot (messageCount );
53- final Flux <Void > target = messages . flatMap ( dummyMessageCommand -> asyncGateway . sendCommand ( dummyMessageCommand , "testTarget" )
54- . doOnSuccess ( aVoid -> semaphore . release ()));
55-
67+ final Flux <Void > target =
68+ messages . flatMap ( dummyMessageCommand -> asyncGateway . sendCommand ( dummyMessageCommand , "testTarget" )
69+ . doOnSuccess ( aVoid -> semaphore . release ()));
5670
5771 final long init = System .currentTimeMillis ();
5872 target .subscribe ();
5973 semaphore .acquire (messageCount );
6074 final long end = System .currentTimeMillis ();
6175
6276 final long total = end - init ;
63- final double microsPerMessage = ((total + 0.0 )/ messageCount )* 1000 ;
77+ final double microsPerMessage = ((total + 0.0 ) / messageCount ) * 1000 ;
6478 System .out .println ("Message count: " + messageCount );
6579 System .out .println ("Total Execution Time: " + total + "ms" );
6680 System .out .println ("Microseconds per message: " + microsPerMessage + "us" );
67- Assertions .assertThat (microsPerMessage ).isLessThan (150 );
81+ assertThat (microsPerMessage ).isLessThan (150 );
82+ }
83+
84+ @ Test
85+ @ SuppressWarnings ("unchecked" )
86+ public void shouldReplyQuery () {
87+ // Arrange
88+ senderMock ();
89+
90+ From from = new From ();
91+ from .setReplyID ("replyId" );
92+ from .setCorrelationID ("correlationId" );
93+ DummyMessage response = new DummyMessage ();
94+ // Act
95+ Mono <Void > result = asyncGateway .reply (response , from );
96+ // Assert
97+ StepVerifier .create (result ).verifyComplete ();
98+ ArgumentCaptor <Map <String , Object >> headersCaptor = ArgumentCaptor .forClass (Map .class );
99+ verify (senderMock , times (1 ))
100+ .sendNoConfirm (eq (response ), eq ("globalReply" ), eq ("replyId" ), headersCaptor .capture (), anyBoolean ());
101+ assertThat (headersCaptor .getValue ().get (CORRELATION_ID )).isEqualTo ("correlationId" );
102+ }
103+
104+ @ Test
105+ @ SuppressWarnings ("unchecked" )
106+ public void shouldReplyQueryWithout () {
107+ // Arrange
108+ senderMock ();
109+
110+ From from = new From ();
111+ from .setReplyID ("replyId" );
112+ from .setCorrelationID ("correlationId" );
113+ // Act
114+ Mono <Void > result = asyncGateway .reply (null , from );
115+ // Assert
116+ StepVerifier .create (result ).verifyComplete ();
117+ ArgumentCaptor <Map <String , Object >> headersCaptor = ArgumentCaptor .forClass (Map .class );
118+ verify (senderMock , times (1 ))
119+ .sendNoConfirm (eq (null ), eq ("globalReply" ), eq ("replyId" ), headersCaptor .capture (), anyBoolean ());
120+ assertThat (headersCaptor .getValue ().get (CORRELATION_ID )).isEqualTo ("correlationId" );
121+ assertThat (headersCaptor .getValue ().get (COMPLETION_ONLY_SIGNAL )).isEqualTo (Boolean .TRUE .toString ());
122+ }
123+
124+ @ Test
125+ @ SuppressWarnings ("unchecked" )
126+ public void shouldHandleRequestReply () throws JsonProcessingException {
127+ // Arrange
128+ senderMock ();
129+ mockReply ();
130+
131+ String queryName = "my.query" ;
132+ String targetName = "app-target" ;
133+ AsyncQuery <DummyMessage > query = new AsyncQuery <>(queryName , new DummyMessage ());
134+ // Act
135+ Mono <DummyMessage > result = asyncGateway .requestReply (query , targetName , DummyMessage .class );
136+ // Assert
137+ StepVerifier .create (result )
138+ .assertNext (res -> assertThat (res .getName ()).startsWith ("Daniel" ))
139+ .verifyComplete ();
140+ ArgumentCaptor <Map <String , Object >> headersCaptor = ArgumentCaptor .forClass (Map .class );
141+ verify (senderMock , times (1 ))
142+ .sendNoConfirm (eq (query ), eq ("exchange" ), eq ("app-target.query" ), headersCaptor .capture (),
143+ anyBoolean ());
144+ assertThat (headersCaptor .getValue ().get (REPLY_ID ).toString ().length ()).isEqualTo (32 );
145+ assertThat (headersCaptor .getValue ().get (CORRELATION_ID ).toString ().length ()).isEqualTo (32 );
146+ }
147+
148+ private void senderMock () {
149+ init (senderMock );
150+ when (senderMock .sendNoConfirm (any (), anyString (), anyString (), anyMap (), anyBoolean ()))
151+ .thenReturn (Mono .empty ());
152+ }
153+
154+ private void mockReply () throws JsonProcessingException {
155+ Message message = mock (Message .class );
156+ ObjectMapper mapper = new ObjectMapper ();
157+ when (message .getBody ()).thenReturn (mapper .writeValueAsString (new DummyMessage ()).getBytes ());
158+ final UnicastProcessor <Message > processor = UnicastProcessor .create (Queues .<Message >one ().get ());
159+ processor .onNext (message );
160+ processor .onComplete ();
161+ when (router .register (anyString ())).thenReturn (processor .singleOrEmpty ());
68162 }
69163
70164 private ReactiveMessageSender getReactiveMessageSender () {
71- MessageConverter messageConverter = new JacksonMessageConverter (new DefaultObjectMapperSupplier ().get ());
72165 Sender sender = new StubSender ();
73- ReactiveMessageSender reactiveSender = new ReactiveMessageSender (sender , "sourceApplication" , messageConverter , null );
74- return reactiveSender ;
166+ return new ReactiveMessageSender (sender , "sourceApplication" , converter , null );
167+ }
168+
169+ private Flux <Command <DummyMessage >> createMessagesHot (int count ) {
170+ final List <Command <DummyMessage >> commands = IntStream .range (0 , count ).mapToObj (value -> new Command <>("app" +
171+ ".command.test" , UUID .randomUUID ().toString (), new DummyMessage ())).collect (Collectors .toList ());
172+ return Flux .fromIterable (commands );
75173 }
76174
77175 static class StubSender extends Sender {
@@ -80,15 +178,14 @@ static class StubSender extends Sender {
80178 public <OMSG extends OutboundMessage > Flux <OutboundMessageResult <OMSG >> sendWithTypedPublishConfirms (Publisher <OMSG > messages ) {
81179 return Flux .from (messages ).map (omsg -> new OutboundMessageResult <>(omsg , true ));
82180 }
83- }
84-
85181
86- private Flux <Command <DummyMessage >> createMessagesHot (int count ) {
87- final List <Command <DummyMessage >> commands = IntStream .range (0 , count ).mapToObj (value -> new Command <>("app.command.test" , UUID .randomUUID ().toString (), new DummyMessage ())).collect (Collectors .toList ());
88- return Flux .fromIterable (commands );
182+ @ Override
183+ @ SuppressWarnings ("rawtypes" )
184+ public Flux <OutboundMessageResult > sendWithPublishConfirms (Publisher <OutboundMessage > messages ) {
185+ return Flux .from (messages ).map (omsg -> new OutboundMessageResult <>(omsg , true ));
186+ }
89187 }
90188
91-
92189 @ Data
93190 static class DummyMessage {
94191 private String name = "Daniel" + ThreadLocalRandom .current ().nextLong ();
0 commit comments