Skip to content

Commit e272bff

Browse files
refs #77: Moved MessageListenerContainer to API as this is not spring specific
1 parent cc940cb commit e272bff

File tree

19 files changed

+240
-132
lines changed

19 files changed

+240
-132
lines changed
Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
package com.jashmore.sqs.spring.container;
1+
package com.jashmore.sqs.container;
22

33
import com.jashmore.sqs.broker.MessageBroker;
44
import com.jashmore.sqs.retriever.AsyncMessageRetriever;
5-
import com.jashmore.sqs.spring.QueueContainerService;
65

76
import javax.annotation.concurrent.ThreadSafe;
87

@@ -13,21 +12,10 @@
1312
* may only need to call down to the underlying {@link MessageBroker#start()} but there could be more complicated actions needing
1413
* to be made, such as starting an {@link AsyncMessageRetriever}.
1514
*
16-
* <p>These containers must be thread safe as there could be multiple threads starting and stopping these containers using the
17-
* {@link QueueContainerService}.
15+
* <p>These containers must be thread safe as there could be multiple threads starting and stopping these containers.
1816
*/
1917
@ThreadSafe
2018
public interface MessageListenerContainer {
21-
/**
22-
* The unique identifier for this container which should not be the same as any other container.
23-
*
24-
* <p>For the default implementations provided by the core Spring Starter the unique identifier is the URL for the queue
25-
* and therefore it isn't possible two different methods call the same queue.
26-
*
27-
* @return the unique identifier
28-
*/
29-
String getIdentifier();
30-
3119
/**
3220
* Start processing messages for a queue by starting any necessary dependencies internally.
3321
*
Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.jashmore.sqs.spring.container;
1+
package com.jashmore.sqs.container;
22

33
import com.google.common.annotations.VisibleForTesting;
44

@@ -8,7 +8,6 @@
88
import com.jashmore.sqs.resolver.MessageResolver;
99
import com.jashmore.sqs.retriever.AsyncMessageRetriever;
1010
import com.jashmore.sqs.retriever.MessageRetriever;
11-
import com.jashmore.sqs.spring.QueueContainerService;
1211
import lombok.extern.slf4j.Slf4j;
1312

1413
import java.util.concurrent.ExecutionException;
@@ -23,16 +22,6 @@
2322
*/
2423
@Slf4j
2524
public class SimpleMessageListenerContainer implements MessageListenerContainer {
26-
/**
27-
* The identifier for this container.
28-
*
29-
* <p>This identifier must be unique across all other containers so that it can be uniquely obtained to start
30-
* or stop specifically.
31-
*
32-
* @see QueueContainerService#startContainer(String) for usage of this identifier
33-
* @see QueueContainerService#stopContainer(String) for usage of this identifier
34-
*/
35-
private final String identifier;
3625

3726
/**
3827
* The {@link MessageRetriever} that will be used in this container to obtain messages to process.
@@ -73,16 +62,13 @@ public class SimpleMessageListenerContainer implements MessageListenerContainer
7362
* Container that can be built when the {@link MessageBroker} is using an {@link AsyncMessageRetriever}. This takes the {@link AsyncMessageRetriever} so
7463
* that during the lifecycle of the spring container, it can be enabled and disabled at the same time that the {@link MessageBroker} is.
7564
*
76-
* @param identifier the unique identifier for this container
7765
* @param messageRetriever the message retriever for this listener
7866
* @param messageBroker the message broker that handles the processing of messages
7967
* @param messageResolver the message resolver that will be used in this container
8068
*/
81-
public SimpleMessageListenerContainer(final String identifier,
82-
final MessageRetriever messageRetriever,
69+
public SimpleMessageListenerContainer(final MessageRetriever messageRetriever,
8370
final MessageBroker messageBroker,
8471
final MessageResolver messageResolver) {
85-
this.identifier = identifier;
8672
this.messageRetriever = messageRetriever;
8773
this.messageBroker = messageBroker;
8874
this.messageResolver = messageResolver;
@@ -92,12 +78,10 @@ public SimpleMessageListenerContainer(final String identifier,
9278
}
9379

9480
@VisibleForTesting
95-
SimpleMessageListenerContainer(final String identifier,
96-
final MessageRetriever messageRetriever,
81+
SimpleMessageListenerContainer(final MessageRetriever messageRetriever,
9782
final MessageBroker messageBroker,
9883
final MessageResolver messageResolver,
9984
final ExecutorService executorService) {
100-
this.identifier = identifier;
10185
this.messageRetriever = messageRetriever;
10286
this.messageBroker = messageBroker;
10387
this.messageResolver = messageResolver;
@@ -106,11 +90,6 @@ public SimpleMessageListenerContainer(final String identifier,
10690
this.messageResolverCompletableFuture = null;
10791
}
10892

109-
@Override
110-
public String getIdentifier() {
111-
return identifier;
112-
}
113-
11493
@Override
11594
public synchronized void start() {
11695
if (isRunning) {
@@ -142,7 +121,6 @@ public synchronized void stop() {
142121
((AsyncMessageRetriever)messageRetriever).stop().get();
143122
}
144123

145-
// TODO: All of the tests for these
146124
if (messageResolverCompletableFuture != null) {
147125
messageResolverCompletableFuture.cancel(true);
148126
}
Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package com.jashmore.sqs.spring.container;
1+
package com.jashmore.sqs.container;
22

3-
import static org.assertj.core.api.Assertions.assertThat;
43
import static org.mockito.Mockito.doReturn;
54
import static org.mockito.Mockito.never;
65
import static org.mockito.Mockito.times;
@@ -31,7 +30,7 @@ public class SimpleMessageListenerContainerTest {
3130

3231
@Mock
3332
private MessageBroker messageBroker;
34-
33+
3534
@Mock
3635
private MessageResolver messageResolver;
3736

@@ -56,19 +55,10 @@ public void setUp() {
5655
when(asyncMessageRetriever.stop()).thenReturn(asyncMessageRetrieverStoppedFuture);
5756
}
5857

59-
@Test
60-
public void identifierPassedIsTheIdentifierForTheContainer() {
61-
// arrange
62-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
63-
64-
// assert
65-
assertThat(container.getIdentifier()).isEqualTo("identifier");
66-
}
67-
6858
@Test
6959
public void onStartTheMessageBrokerIsStarted() {
7060
// arrange
71-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
61+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
7262

7363
// act
7464
container.start();
@@ -80,7 +70,7 @@ public void onStartTheMessageBrokerIsStarted() {
8070
@Test
8171
public void whenContainerIsAlreadyStartedTheMessageBrokerAndMessageRetrieverAreNotStartedAgain() {
8272
// arrange
83-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
73+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
8474

8575
// act
8676
container.start();
@@ -94,7 +84,7 @@ public void whenContainerIsAlreadyStartedTheMessageBrokerAndMessageRetrieverAreN
9484
@Test
9585
public void stoppingContainerWhenNotRunningDoesNothing() {
9686
// arrange
97-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
87+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
9888

9989
// act
10090
container.stop();
@@ -106,7 +96,7 @@ public void stoppingContainerWhenNotRunningDoesNothing() {
10696
@Test
10797
public void stoppingContainerWhenRunningWillStopAsyncMessageRetrievers() {
10898
// arrange
109-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
99+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
110100
container.start();
111101

112102
// act
@@ -119,7 +109,7 @@ public void stoppingContainerWhenRunningWillStopAsyncMessageRetrievers() {
119109
@Test
120110
public void stoppingContainerWhenRunningWillStopTheMessageBroker() {
121111
// arrange
122-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
112+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
123113
container.start();
124114

125115
// act
@@ -132,7 +122,7 @@ public void stoppingContainerWhenRunningWillStopTheMessageBroker() {
132122
@Test
133123
public void stoppingContainerWhenRunningWillWaitUntilMessageBrokerIsStopped() throws InterruptedException, ExecutionException {
134124
// arrange
135-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
125+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
136126
container.start();
137127

138128
// act
@@ -145,7 +135,7 @@ public void stoppingContainerWhenRunningWillWaitUntilMessageBrokerIsStopped() th
145135
@Test
146136
public void stoppingContainerWhenRunningWillWaitUntilAsyncMessageRetrieverIsStopped() throws InterruptedException, ExecutionException {
147137
// arrange
148-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
138+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
149139
container.start();
150140

151141
// act
@@ -158,7 +148,7 @@ public void stoppingContainerWhenRunningWillWaitUntilAsyncMessageRetrieverIsStop
158148
@Test
159149
public void exceptionThrownWhileStoppingAsyncMessageRetrieverWillNotBubbleException() throws InterruptedException, ExecutionException {
160150
// arrange
161-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
151+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
162152
container.start();
163153
when(asyncMessageRetrieverStoppedFuture.get()).thenThrow(new ExecutionException("test", new IllegalArgumentException()));
164154

@@ -170,7 +160,7 @@ public void exceptionThrownWhileStoppingAsyncMessageRetrieverWillNotBubbleExcept
170160
@Test
171161
public void exceptionThrownWhileStoppingMessageBrokerWillNotBubbleException() throws InterruptedException, ExecutionException {
172162
// arrange
173-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
163+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
174164
container.start();
175165
when(messageBrokerStoppedFuture.get()).thenThrow(new ExecutionException("test", new IllegalArgumentException()));
176166

@@ -181,7 +171,7 @@ public void exceptionThrownWhileStoppingMessageBrokerWillNotBubbleException() th
181171
@Test
182172
public void stoppingAlreadyStoppedContainerWillDoNothing() {
183173
// arrange
184-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever, messageBroker, messageResolver);
174+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever, messageBroker, messageResolver);
185175
container.start();
186176

187177
// act
@@ -196,7 +186,7 @@ public void stoppingAlreadyStoppedContainerWillDoNothing() {
196186
@Test
197187
public void asyncMessageResolverWillBeStartedOnBackgroundThreadWhenStartCalled() {
198188
// arrange
199-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever,
189+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever,
200190
messageBroker, asyncMessageResolver, executorService);
201191

202192
// act
@@ -209,7 +199,7 @@ public void asyncMessageResolverWillBeStartedOnBackgroundThreadWhenStartCalled()
209199
@Test
210200
public void whenAsyncMessageResolverStartedItWillCancelThreadWhenContainerIsStopped() {
211201
// arrange
212-
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer("identifier", asyncMessageRetriever,
202+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(asyncMessageRetriever,
213203
messageBroker, asyncMessageResolver, executorService);
214204
doReturn(messageResolverThreadFuture).when(executorService).submit(asyncMessageResolver);
215205
container.start();

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
<artifactId>java-dynamic-sqs-listener-api</artifactId>
2323
<version>${project.version}</version>
2424
</dependency>
25+
26+
<dependency>
27+
<groupId>org.projectlombok</groupId>
28+
<artifactId>lombok</artifactId>
29+
</dependency>
2530
</dependencies>
2631

2732
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.jashmore.sqs.spring;
2+
3+
import com.jashmore.sqs.container.MessageListenerContainer;
4+
import lombok.Builder;
5+
import lombok.Value;
6+
import lombok.experimental.NonFinal;
7+
8+
/**
9+
* Defines a {@link MessageListenerContainer} with a provided identifier that can be used by the {@link QueueContainerService} to start and stop them during
10+
* execution.
11+
*/
12+
@Value
13+
@NonFinal
14+
@Builder
15+
public class IdentifiableMessageListenerContainer {
16+
/**
17+
* The unique identifier for this container which should not be the same as any other container.
18+
*
19+
* <p>For the default implementations provided by the core Spring Starter the unique identifier is the URL for the queue
20+
* and therefore it isn't possible two different methods call the same queue.
21+
*/
22+
private String identifier;
23+
/**
24+
* The container that wraps a method and is identifiable by the {@link #identifier}.
25+
*/
26+
private MessageListenerContainer container;
27+
}

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/QueueWrapper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.jashmore.sqs.spring;
22

3-
import com.jashmore.sqs.spring.container.MessageListenerContainer;
3+
import com.jashmore.sqs.container.MessageListenerContainer;
44

55
import java.lang.reflect.Method;
66

@@ -20,11 +20,11 @@ public interface QueueWrapper {
2020
boolean canWrapMethod(Method method);
2121

2222
/**
23-
* Wrap a method with a {@link MessageListenerContainer} that will handle the messages being processed.
23+
* Wrap a method with a {@link MessageListenerContainer} that will handle the messages being processed and identify it with a unique identifier.
2424
*
2525
* @param bean the specific bean for this method
2626
* @param method the method of the bean that will be run for each message
2727
* @return the container that will wrap this method
2828
*/
29-
MessageListenerContainer wrapMethod(Object bean, Method method);
29+
IdentifiableMessageListenerContainer wrapMethod(Object bean, Method method);
3030
}

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/AbstractQueueAnnotationWrapper.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.jashmore.sqs.spring;
22

3-
import com.jashmore.sqs.spring.container.MessageListenerContainer;
43
import com.jashmore.sqs.util.annotation.AnnotationUtils;
54

65
import java.lang.annotation.Annotation;
@@ -20,7 +19,7 @@ public boolean canWrapMethod(final Method method) {
2019
}
2120

2221
@Override
23-
public MessageListenerContainer wrapMethod(final Object bean, final Method method) {
22+
public IdentifiableMessageListenerContainer wrapMethod(final Object bean, final Method method) {
2423
final T annotation = AnnotationUtils.findMethodAnnotation(method, getAnnotationClass())
2524
.orElseThrow(() -> new RuntimeException("Trying to wrap method that does not contain annotation: @" + getAnnotationClass().getSimpleName()));
2625

@@ -42,5 +41,5 @@ public MessageListenerContainer wrapMethod(final Object bean, final Method metho
4241
* @param annotation the annotation found on the method containing details about this listener
4342
* @return the container that wraps the method for usage by the queue listeners
4443
*/
45-
protected abstract MessageListenerContainer wrapMethodContainingAnnotation(final Object bean, final Method method, T annotation);
44+
protected abstract IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(final Object bean, final Method method, T annotation);
4645
}

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/DefaultQueueContainerService.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import com.google.common.collect.ImmutableMap;
77
import com.google.common.collect.ImmutableSet;
88

9-
import com.jashmore.sqs.spring.container.MessageListenerContainer;
9+
import com.jashmore.sqs.container.MessageListenerContainer;
1010
import lombok.extern.slf4j.Slf4j;
1111
import org.springframework.beans.BeansException;
1212
import org.springframework.context.ApplicationContext;
@@ -68,12 +68,6 @@ public DefaultQueueContainerService(final List<QueueWrapper> queueWrappers) {
6868
this.queueWrappers = queueWrappers;
6969
}
7070

71-
@VisibleForTesting
72-
DefaultQueueContainerService(final ExecutorService executorService, final List<QueueWrapper> queueWrappers) {
73-
this.executorService = executorService;
74-
this.queueWrappers = queueWrappers;
75-
}
76-
7771
/**
7872
* Initialise all of the containers for this application by finding all bean methods that need to be wrapped.
7973
*/
@@ -97,13 +91,13 @@ public synchronized void setApplicationContext(@Nonnull final ApplicationContext
9791
for (final Method method : bean.getClass().getMethods()) {
9892
for (final QueueWrapper annotationProcessor : queueWrappers) {
9993
if (annotationProcessor.canWrapMethod(method)) {
100-
final MessageListenerContainer messageListenerContainer = annotationProcessor.wrapMethod(bean, method);
101-
if (messageContainers.containsKey(messageListenerContainer.getIdentifier())) {
94+
final IdentifiableMessageListenerContainer identifiableMessageListenerContainer = annotationProcessor.wrapMethod(bean, method);
95+
if (messageContainers.containsKey(identifiableMessageListenerContainer.getIdentifier())) {
10296
throw new IllegalStateException("Created two MessageListenerContainers with the same identifier: "
103-
+ messageListenerContainer.getIdentifier());
97+
+ identifiableMessageListenerContainer.getIdentifier());
10498
}
105-
log.debug("Created MessageListenerContainer with id: {}", messageListenerContainer.getIdentifier());
106-
messageContainers.put(messageListenerContainer.getIdentifier(), messageListenerContainer);
99+
log.debug("Created MessageListenerContainer with id: {}", identifiableMessageListenerContainer.getIdentifier());
100+
messageContainers.put(identifiableMessageListenerContainer.getIdentifier(), identifiableMessageListenerContainer.getContainer());
107101
}
108102
}
109103
}

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/config/QueueListenerConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
import com.jashmore.sqs.argument.payload.mapper.JacksonPayloadMapper;
1010
import com.jashmore.sqs.argument.payload.mapper.PayloadMapper;
1111
import com.jashmore.sqs.argument.visibility.VisibilityExtenderArgumentResolver;
12+
import com.jashmore.sqs.container.MessageListenerContainer;
1213
import com.jashmore.sqs.spring.DefaultQueueContainerService;
1314
import com.jashmore.sqs.spring.QueueContainerService;
1415
import com.jashmore.sqs.spring.QueueWrapper;
15-
import com.jashmore.sqs.spring.container.MessageListenerContainer;
1616
import com.jashmore.sqs.spring.container.basic.QueueListenerWrapper;
1717
import com.jashmore.sqs.spring.container.batching.BatchingQueueListenerWrapper;
1818
import com.jashmore.sqs.spring.container.prefetch.PrefetchingQueueListenerWrapper;

0 commit comments

Comments
 (0)