22
33import static java .util .concurrent .TimeUnit .MINUTES ;
44import static java .util .concurrent .TimeUnit .SECONDS ;
5- import static org .assertj .core .api .Assertions .assertThat ;
65
76import com .fasterxml .jackson .databind .ObjectMapper ;
87import com .jashmore .sqs .QueueProperties ;
2423import com .jashmore .sqs .retriever .prefetch .PrefetchingMessageRetriever ;
2524import com .jashmore .sqs .retriever .prefetch .StaticPrefetchingMessageRetrieverProperties ;
2625import com .jashmore .sqs .test .LocalSqsRule ;
27- import it .com .jashmore .sqs .AbstractSqsIntegrationTest ;
26+ import it .com .jashmore .sqs .listener . util . SqsIntegrationTestUtils ;
2827import lombok .extern .slf4j .Slf4j ;
29- import org .junit .After ;
3028import org .junit .Before ;
3129import org .junit .Rule ;
3230import org .junit .Test ;
3331import software .amazon .awssdk .services .sqs .SqsAsyncClient ;
3432
3533import java .util .concurrent .CountDownLatch ;
3634import java .util .concurrent .Executors ;
37- import java .util .concurrent .atomic .AtomicInteger ;
3835
3936@ Slf4j
40- public class ConcurrentMessageBrokerIntegrationTest extends AbstractSqsIntegrationTest {
37+ public class ConcurrentMessageBrokerIntegrationTest {
4138 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
4239 private static final PayloadMapper PAYLOAD_MAPPER = new JacksonPayloadMapper (OBJECT_MAPPER );
4340
4441 @ Rule
4542 public LocalSqsRule localSqsRule = new LocalSqsRule ();
4643
4744 private String queueUrl ;
48- private SqsAsyncClient sqsAsyncClient ;
4945 private QueueProperties queueProperties ;
5046 private ArgumentResolverService argumentResolverService ;
5147
5248 @ Before
5349 public void setUp () {
5450 queueUrl = localSqsRule .createRandomQueue ();
5551 queueProperties = QueueProperties .builder ().queueUrl (queueUrl ).build ();
56- sqsAsyncClient = localSqsRule .getLocalAmazonSqsAsync ();
57- argumentResolverService = new CoreArgumentResolverService (PAYLOAD_MAPPER , sqsAsyncClient );
58- }
59-
60- @ After
61- public void tearDown () {
62- // If the thread running the tests is interrupted it will break future tests. This will be fixed in release of JUnit 4.13 but until then
63- // we use this workaround. See https://github.com/junit-team/junit4/issues/1365
64- Thread .interrupted ();
65- }
66-
67- @ Test
68- public void concurrentListenerCanConsumeMultipleMessagesFromQueueAtOnce () throws Exception {
69- // arrange
70- final int concurrencyLevel = 5 ;
71- final MessageRetriever messageRetriever = new IndividualMessageRetriever (
72- sqsAsyncClient ,
73- queueProperties ,
74- IndividualMessageRetrieverProperties .builder ()
75- .visibilityTimeoutForMessagesInSeconds (5 )
76- .build ()
77- );
78- final CountDownLatch concurrentMessagesLatch = new CountDownLatch (concurrencyLevel );
79- final MessageConsumer messageConsumer = new MessageConsumer (concurrentMessagesLatch );
80- final MessageResolver messageResolver = new IndividualMessageResolver (queueProperties , sqsAsyncClient );
81- final MessageProcessor messageProcessor = new DefaultMessageProcessor (
82- argumentResolverService ,
83- queueProperties ,
84- messageResolver ,
85- MessageConsumer .class .getMethod ("consume" , String .class ),
86- messageConsumer
87- );
88- final ConcurrentMessageBroker messageBroker = new ConcurrentMessageBroker (
89- messageRetriever ,
90- messageProcessor ,
91- Executors .newCachedThreadPool (),
92- StaticConcurrentMessageBrokerProperties .builder ()
93- .concurrencyLevel (concurrencyLevel )
94- .build ()
95- );
96- sendNumberOfMessages (concurrencyLevel , sqsAsyncClient , queueUrl );
97-
98- // act
99- messageBroker .start ();
100- concurrentMessagesLatch .await (60 , SECONDS );
101-
102- // cleanup
103- messageBroker .stop ().get (4 , SECONDS );
104-
105- // assert
106- assertThat (messageConsumer .numberOfTimesProcessed .get ()).isEqualTo (concurrencyLevel );
107- assertNoMessagesInQueue (sqsAsyncClient , queueUrl );
52+ argumentResolverService = new CoreArgumentResolverService (PAYLOAD_MAPPER , localSqsRule .getLocalAmazonSqsAsync ());
10853 }
10954
11055 @ Test
11156 public void allMessagesSentIntoQueueAreProcessed () throws Exception {
11257 // arrange
11358 final int concurrencyLevel = 10 ;
114- final int numberOfMessages = 300 ;
115- final QueueProperties queueProperties = QueueProperties .builder ().queueUrl (queueUrl ).build ();
59+ final int numberOfMessages = 100 ;
11660 final SqsAsyncClient sqsAsyncClient = localSqsRule .getLocalAmazonSqsAsync ();
11761 final MessageRetriever messageRetriever = new IndividualMessageRetriever (
11862 sqsAsyncClient ,
11963 queueProperties ,
120- IndividualMessageRetrieverProperties .builder ().visibilityTimeoutForMessagesInSeconds (1 ).build ()
64+ IndividualMessageRetrieverProperties .builder ()
65+ .visibilityTimeoutForMessagesInSeconds (30 )
66+ .build ()
12167 );
12268 final CountDownLatch messageReceivedLatch = new CountDownLatch (numberOfMessages );
123- final PayloadMapper payloadMapper = new JacksonPayloadMapper (OBJECT_MAPPER );
124- final ArgumentResolverService argumentResolverService = new CoreArgumentResolverService (payloadMapper , sqsAsyncClient );
12569 final MessageConsumer messageConsumer = new MessageConsumer (messageReceivedLatch );
12670 final MessageResolver messageResolver = new IndividualMessageResolver (queueProperties , sqsAsyncClient );
12771 final MessageProcessor messageProcessor = new DefaultMessageProcessor (
@@ -139,42 +83,38 @@ public void allMessagesSentIntoQueueAreProcessed() throws Exception {
13983 .concurrencyLevel (concurrencyLevel )
14084 .build ()
14185 );
142- sendNumberOfMessages (numberOfMessages , sqsAsyncClient , queueUrl );
86+ SqsIntegrationTestUtils . sendNumberOfMessages (numberOfMessages , sqsAsyncClient , queueUrl );
14387
14488 // act
14589 messageBroker .start ();
14690
14791 // assert
14892 messageReceivedLatch .await (60 , SECONDS );
149- assertThat (messageConsumer .numberOfTimesProcessed .get ()).isEqualTo (numberOfMessages );
15093
15194 // cleanup
15295 messageBroker .stop ().get (4 , SECONDS );
153- assertNoMessagesInQueue (sqsAsyncClient , queueUrl );
96+ SqsIntegrationTestUtils . assertNoMessagesInQueue (sqsAsyncClient , queueUrl );
15497 }
15598
15699 @ Test
157100 public void usingPrefetchingMessageRetrieverCanConsumeAllMessages () throws Exception {
158101 // arrange
159102 final int concurrencyLevel = 10 ;
160- final int numberOfMessages = 300 ;
161- final QueueProperties queueProperties = QueueProperties .builder ().queueUrl (queueUrl ).build ();
103+ final int numberOfMessages = 100 ;
162104 final SqsAsyncClient sqsAsyncClient = localSqsRule .getLocalAmazonSqsAsync ();
163105 final AsyncMessageRetriever messageRetriever = new PrefetchingMessageRetriever (
164106 sqsAsyncClient ,
165107 queueProperties ,
166108 StaticPrefetchingMessageRetrieverProperties
167109 .builder ()
168- .visibilityTimeoutForMessagesInSeconds (1 )
110+ .visibilityTimeoutForMessagesInSeconds (60 )
169111 .maxWaitTimeInSecondsToObtainMessagesFromServer (1 )
170112 .desiredMinPrefetchedMessages (30 )
171113 .maxPrefetchedMessages (40 )
172114 .build (),
173115 Executors .newCachedThreadPool ()
174116 );
175117 final CountDownLatch messageReceivedLatch = new CountDownLatch (numberOfMessages );
176- final PayloadMapper payloadMapper = new JacksonPayloadMapper (OBJECT_MAPPER );
177- final ArgumentResolverService argumentResolverService = new CoreArgumentResolverService (payloadMapper , sqsAsyncClient );
178118 final MessageConsumer messageConsumer = new MessageConsumer (messageReceivedLatch );
179119 final MessageResolver messageResolver = new IndividualMessageResolver (queueProperties , sqsAsyncClient );
180120 final MessageProcessor messageProcessor = new DefaultMessageProcessor (
@@ -192,27 +132,25 @@ public void usingPrefetchingMessageRetrieverCanConsumeAllMessages() throws Excep
192132 .concurrencyLevel (concurrencyLevel )
193133 .build ()
194134 );
195- sendNumberOfMessages (numberOfMessages , sqsAsyncClient , queueUrl );
135+ SqsIntegrationTestUtils . sendNumberOfMessages (numberOfMessages , sqsAsyncClient , queueUrl );
196136 messageRetriever .start ();
197137
198138 // act
199139 messageBroker .start ();
200140
201141 // assert
202142 messageReceivedLatch .await (1 , MINUTES );
203- assertThat (messageConsumer .numberOfTimesProcessed .get ()).isEqualTo (numberOfMessages );
204143
205144 // cleanup
206145 messageRetriever .stop ().get (5 , SECONDS );
207146 log .debug ("Stopped message retriever" );
208147 messageBroker .stop ().get (10 , SECONDS );
209- assertNoMessagesInQueue (sqsAsyncClient , queueUrl );
148+ SqsIntegrationTestUtils . assertNoMessagesInQueue (sqsAsyncClient , queueUrl );
210149 }
211150
212151 @ SuppressWarnings ("WeakerAccess" )
213152 public static class MessageConsumer {
214153 private final CountDownLatch messagesReceivedLatch ;
215- private final AtomicInteger numberOfTimesProcessed = new AtomicInteger (0 );
216154
217155 public MessageConsumer (final CountDownLatch messagesReceivedLatch ) {
218156 this .messagesReceivedLatch = messagesReceivedLatch ;
@@ -221,7 +159,6 @@ public MessageConsumer(final CountDownLatch messagesReceivedLatch) {
221159 @ SuppressWarnings ("unused" )
222160 public void consume (@ Payload final String messagePayload ) {
223161 log .info ("Consuming message: {}" , messagePayload );
224- numberOfTimesProcessed .incrementAndGet ();
225162 messagesReceivedLatch .countDown ();
226163 }
227164 }
0 commit comments