Skip to content

Commit 5a4052c

Browse files
refs #79: Allow QueueContainerService to be dependency injected (#82)
* refs #79: Allow QueueContainerService to be dependency injected * refs #79: Fix checkstyle problems * refs #79: Clean up bug in ConcurrentMessageBroker and BatchingMessageResolver
1 parent 208f27c commit 5a4052c

File tree

9 files changed

+178
-101
lines changed

9 files changed

+178
-101
lines changed

examples/java-dynamic-sqs-listener-spring-starter-examples/src/main/java/com/jashmore/sqs/examples/ScheduledQueueListenerEnabler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@
33
import com.jashmore.sqs.spring.QueueContainerService;
44
import lombok.RequiredArgsConstructor;
55
import lombok.extern.slf4j.Slf4j;
6-
import org.springframework.beans.factory.BeanFactory;
76
import org.springframework.scheduling.annotation.Scheduled;
87
import org.springframework.stereotype.Component;
98

109
@Slf4j
1110
@Component
1211
@RequiredArgsConstructor
1312
public class ScheduledQueueListenerEnabler {
14-
// TODO: #79: Allow for dependency injection of QueueContainerService
15-
private final BeanFactory beanFactory;
13+
private final QueueContainerService queueContainerService;
1614

1715
/**
1816
* Just a scheduled job that shows that there are methods of turning off the container when necessary.
@@ -23,7 +21,6 @@ public class ScheduledQueueListenerEnabler {
2321
public void turnOfSqsListener() throws InterruptedException {
2422
log.info("Turning off SQS Listener for a short period");
2523

26-
final QueueContainerService queueContainerService = beanFactory.getBean(QueueContainerService.class);
2724
queueContainerService.stopContainer("test");
2825
Thread.sleep(5_000);
2926
log.info("Turning SQS Listener back om");

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,6 @@ public class ConcurrentMessageBroker implements MessageBroker {
2323
private final MessageProcessor messageProcessor;
2424
private final ConcurrentMessageBrokerProperties properties;
2525

26-
/**
27-
* Semaphore used to control the number of threads that are available to be run.
28-
*
29-
* <p>This is set to zero but will be replaced by the value from the {@link #properties}.
30-
*/
31-
private final ResizableSemaphore concurrentMessagesBeingProcessedSemaphore = new ResizableSemaphore(0);
32-
3326
public ConcurrentMessageBroker(final MessageRetriever messageRetriever,
3427
final MessageProcessor messageProcessor,
3528
final ConcurrentMessageBrokerProperties properties) {
@@ -40,7 +33,7 @@ public ConcurrentMessageBroker(final MessageRetriever messageRetriever,
4033

4134
/**
4235
* RV_RETURN_VALUE_IGNORED_BAD_PRACTICE is ignored because we don't actually care about the return future for submitting a thread to process a message.
43-
* Instead the {@link #concurrentMessagesBeingProcessedSemaphore} is used to control the number of concurrent threads and when we should down we
36+
* Instead the {@link ResizableSemaphore} is used to control the number of concurrent threads and when we should down we
4437
* wait for the whole {@link ExecutorService} to finish and therefore we don't care about an individual thread.
4538
*/
4639
@Override
@@ -94,8 +87,8 @@ public void run() {
9487
private void updateConcurrencyLevelIfChanged(final ResizableSemaphore resizableSemaphore) {
9588
final int newConcurrencyLevel = properties.getConcurrencyLevel();
9689

97-
if (concurrentMessagesBeingProcessedSemaphore.getMaximumPermits() != newConcurrencyLevel) {
98-
log.debug("Changing concurrency from {} to {}", concurrentMessagesBeingProcessedSemaphore.getMaximumPermits(), newConcurrencyLevel);
90+
if (resizableSemaphore.getMaximumPermits() != newConcurrencyLevel) {
91+
log.debug("Changing concurrency from {} to {}", resizableSemaphore.getMaximumPermits(), newConcurrencyLevel);
9992
resizableSemaphore.changePermitSize(newConcurrencyLevel);
10093
}
10194
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/resolver/batching/BatchingMessageResolver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,14 @@ public CompletableFuture<?> resolveMessage(final Message message) {
8181

8282
@Override
8383
public void run() {
84-
while (!Thread.currentThread().isInterrupted()) {
84+
boolean continueProcessing = true;
85+
while (continueProcessing) {
8586
final List<MessageResolutionBean> batchOfMessagesToResolve = new LinkedList<>();
8687
try {
8788
Queues.drain(messagesToBeResolved, batchOfMessagesToResolve, getBatchSize(), getBufferingTimeInMs(), TimeUnit.MILLISECONDS);
8889
} catch (final InterruptedException interruptedException) {
8990
// Do nothing, we still want to send the current batch of messages
91+
continueProcessing = false;
9092
}
9193

9294
if (!batchOfMessagesToResolve.isEmpty()) {

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/batching/BatchingMessageRetriever.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,15 @@ public void run() {
121121
log.debug("Requesting {} messages", numberOfMessagesToObtain);
122122

123123
try {
124+
final ReceiveMessageResponse response;
124125
try {
125-
final ReceiveMessageResponse response = sqsAsyncClient.receiveMessage(buildReceiveMessageRequest(numberOfMessagesToObtain))
126+
response = sqsAsyncClient.receiveMessage(buildReceiveMessageRequest(numberOfMessagesToObtain))
126127
.get();
128+
} catch (final InterruptedException interruptedException) {
129+
log.debug("Thread interrupted while obtaining messages from SQS");
130+
break;
131+
}
132+
try {
127133
for (final Message message : response.messages()) {
128134
messagesDownloaded.put(message);
129135
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package com.jashmore.sqs.spring;
22

3-
import static java.util.stream.Collectors.toList;
4-
53
import com.google.common.annotations.VisibleForTesting;
4+
import com.google.common.base.Suppliers;
65
import com.google.common.collect.ImmutableMap;
76
import com.google.common.collect.ImmutableSet;
87

@@ -19,14 +18,12 @@
1918
import java.util.Map;
2019
import java.util.Optional;
2120
import java.util.Set;
21+
import java.util.concurrent.CompletableFuture;
2222
import java.util.concurrent.ExecutionException;
23-
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
25-
import java.util.concurrent.Future;
2623
import java.util.concurrent.atomic.AtomicBoolean;
2724
import java.util.function.Consumer;
25+
import java.util.function.Supplier;
2826
import javax.annotation.Nonnull;
29-
import javax.annotation.concurrent.GuardedBy;
3027
import javax.annotation.concurrent.ThreadSafe;
3128

3229
/**
@@ -38,116 +35,84 @@
3835
@Slf4j
3936
@ThreadSafe
4037
public class DefaultQueueContainerService implements QueueContainerService, ApplicationContextAware, SmartLifecycle {
41-
/**
42-
* Used to be able to start and stop containers concurrently.
43-
*/
44-
private final ExecutorService executorService;
45-
4638
/**
4739
* These {@link QueueWrapper}s should be injected by the spring application and therefore to add more wrappers into the system a corresponding bean
4840
* with this interface must be included in the application.
4941
*/
5042
private final List<QueueWrapper> queueWrappers;
5143

5244
/**
53-
* This contains all of the containers that have been created from wrapping the Spring Application's bean's methods.
45+
* This contains a supplier that can obtain all of the {@link MessageListenerContainer}s that have been built for this application.
5446
*
55-
* <p>This is only modified via the {@link #setApplicationContext(ApplicationContext)} method, which will only be called during the lifecycle of the spring
56-
* application. This method protects from multiple calls to setting this application context so this will maintain its thread safety.
47+
* <p>This must be contained within a {@link Supplier} because the {@link QueueContainerService} can be dependency injected into the application and
48+
* because the construction of these containers needs to look at all beans there can be a cyclic dependency.
5749
*/
58-
@GuardedBy("this")
59-
private Map<String, MessageListenerContainer> containers = null;
50+
private Supplier<Map<String, MessageListenerContainer>> containersLazilyLoaded;
6051

6152
/**
6253
* Determines whether this container service is currently running in the Spring lifecycle.
6354
*/
6455
private AtomicBoolean isRunning = new AtomicBoolean(false);
6556

6657
public DefaultQueueContainerService(final List<QueueWrapper> queueWrappers) {
67-
this.executorService = Executors.newCachedThreadPool();
6858
this.queueWrappers = queueWrappers;
6959
}
7060

71-
/**
72-
* Initialise all of the containers for this application by finding all bean methods that need to be wrapped.
73-
*/
7461
@Override
75-
public synchronized void setApplicationContext(@Nonnull final ApplicationContext applicationContext) throws BeansException {
76-
if (containers != null) {
77-
log.warn("Trying to set application context when already set up previously");
78-
return;
79-
}
80-
81-
if (queueWrappers.isEmpty()) {
82-
containers = ImmutableMap.of();
83-
return;
84-
}
85-
86-
log.debug("Initialising QueueContainerService...");
87-
final Map<String, MessageListenerContainer> messageContainers = new HashMap<>();
88-
89-
for (final String beanName : applicationContext.getBeanDefinitionNames()) {
90-
final Object bean = applicationContext.getBean(beanName);
91-
for (final Method method : bean.getClass().getMethods()) {
92-
for (final QueueWrapper annotationProcessor : queueWrappers) {
93-
if (annotationProcessor.canWrapMethod(method)) {
94-
final IdentifiableMessageListenerContainer identifiableMessageListenerContainer = annotationProcessor.wrapMethod(bean, method);
95-
if (messageContainers.containsKey(identifiableMessageListenerContainer.getIdentifier())) {
96-
throw new IllegalStateException("Created two MessageListenerContainers with the same identifier: "
97-
+ identifiableMessageListenerContainer.getIdentifier());
98-
}
99-
log.debug("Created MessageListenerContainer with id: {}", identifiableMessageListenerContainer.getIdentifier());
100-
messageContainers.put(identifiableMessageListenerContainer.getIdentifier(), identifiableMessageListenerContainer.getContainer());
101-
}
102-
}
103-
}
104-
}
105-
106-
this.containers = ImmutableMap.copyOf(messageContainers);
62+
public void setApplicationContext(@Nonnull final ApplicationContext applicationContext) throws BeansException {
63+
containersLazilyLoaded = Suppliers.memoize(() -> calculateMessageListenerContainers(queueWrappers, applicationContext));
10764
}
10865

10966
@Override
110-
public synchronized void startAllContainers() {
67+
public void startAllContainers() {
11168
runForAllContainers(MessageListenerContainer::start);
11269
}
11370

11471
@Override
115-
public synchronized void startContainer(final String queueIdentifier) {
72+
public void startContainer(final String queueIdentifier) {
11673
runForQueue(queueIdentifier, MessageListenerContainer::start);
11774
}
11875

11976
@Override
120-
public synchronized void stopAllContainers() {
77+
public void stopAllContainers() {
12178
runForAllContainers(MessageListenerContainer::stop);
12279
}
12380

12481
@Override
125-
public synchronized void stopContainer(final String queueIdentifier) {
82+
public void stopContainer(final String queueIdentifier) {
12683
runForQueue(queueIdentifier, MessageListenerContainer::stop);
12784
}
12885

86+
/**
87+
* For each of the containers run the following {@link Consumer} asynchronously and wait for them all to finish.
88+
*
89+
* @param containerConsumer the consumer to call
90+
*/
12991
private void runForAllContainers(final Consumer<MessageListenerContainer> containerConsumer) {
130-
final List<? extends Future<?>> taskFutures = containers.values().stream()
131-
.map(container -> executorService.submit(() -> containerConsumer.accept(container)))
132-
.collect(toList());
133-
134-
for (final Future<?> future : taskFutures) {
135-
try {
136-
future.get();
137-
} catch (InterruptedException interruptedException) {
138-
log.warn("Thread interrupted while running command across all containers");
139-
return;
140-
} catch (ExecutionException executionException) {
141-
log.error("Error running command on container", executionException);
142-
}
92+
final CompletableFuture<?>[] allTaskCompletableFutures = containersLazilyLoaded.get().values().stream()
93+
.map(container -> CompletableFuture.runAsync(() -> containerConsumer.accept(container)))
94+
.toArray(CompletableFuture[]::new);
95+
96+
try {
97+
CompletableFuture.allOf(allTaskCompletableFutures).get();
98+
} catch (InterruptedException interruptedException) {
99+
log.warn("Thread interrupted while running command across all containers");
100+
} catch (ExecutionException executionException) {
101+
log.error("Error running command on container", executionException);
143102
}
144103
}
145104

146-
private void runForQueue(final String queueIdentifier, Consumer<MessageListenerContainer> runnable) {
147-
final MessageListenerContainer container = Optional.ofNullable(containers.get(queueIdentifier))
105+
/**
106+
* For the given queue with the identifier run the following {@link Consumer} for the container and wait until it is finished.
107+
*
108+
* @param queueIdentifier the identifier of the queue
109+
* @param containerConsumer the container consumer to run
110+
*/
111+
private void runForQueue(final String queueIdentifier, final Consumer<MessageListenerContainer> containerConsumer) {
112+
final MessageListenerContainer container = Optional.ofNullable(containersLazilyLoaded.get().get(queueIdentifier))
148113
.orElseThrow(() -> new IllegalArgumentException("No container with the provided identifier"));
149114

150-
runnable.accept(container);
115+
containerConsumer.accept(container);
151116
}
152117

153118
@Override
@@ -191,6 +156,39 @@ public synchronized int getPhase() {
191156

192157
@VisibleForTesting
193158
synchronized Set<MessageListenerContainer> getContainers() {
194-
return ImmutableSet.copyOf(containers.values());
159+
return ImmutableSet.copyOf(containersLazilyLoaded.get().values());
160+
}
161+
162+
/**
163+
* Initialise all of the containers for this application by finding all bean methods that need to be wrapped.
164+
*/
165+
private static Map<String, MessageListenerContainer> calculateMessageListenerContainers(
166+
@Nonnull final List<QueueWrapper> queueWrappers,
167+
@Nonnull final ApplicationContext applicationContext) {
168+
if (queueWrappers.isEmpty()) {
169+
return ImmutableMap.of();
170+
}
171+
172+
log.debug("Initialising QueueContainerService...");
173+
final Map<String, MessageListenerContainer> messageContainers = new HashMap<>();
174+
175+
for (final String beanName : applicationContext.getBeanDefinitionNames()) {
176+
final Object bean = applicationContext.getBean(beanName);
177+
for (final Method method : bean.getClass().getMethods()) {
178+
for (final QueueWrapper annotationProcessor : queueWrappers) {
179+
if (annotationProcessor.canWrapMethod(method)) {
180+
final IdentifiableMessageListenerContainer identifiableMessageListenerContainer = annotationProcessor.wrapMethod(bean, method);
181+
if (messageContainers.containsKey(identifiableMessageListenerContainer.getIdentifier())) {
182+
throw new IllegalStateException("Created two MessageListenerContainers with the same identifier: "
183+
+ identifiableMessageListenerContainer.getIdentifier());
184+
}
185+
log.debug("Created MessageListenerContainer with id: {}", identifiableMessageListenerContainer.getIdentifier());
186+
messageContainers.put(identifiableMessageListenerContainer.getIdentifier(), identifiableMessageListenerContainer.getContainer());
187+
}
188+
}
189+
}
190+
}
191+
192+
return ImmutableMap.copyOf(messageContainers);
195193
}
196194
}

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/test/java/com/jashmore/sqs/spring/DefaultQueueContainerServiceTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void methodsThatAreNotEligibleForWrappingWillNotCreateMessageListeners()
114114
}
115115

116116
@Test
117-
public void duplicateMessageListenerContainsThrowsExceptionOnInitialisation() throws NoSuchMethodException {
117+
public void duplicateMessageListenerContainsThrowsExceptionWhenStarting() throws NoSuchMethodException {
118118
// arrange
119119
final BeanWithTwoMethods bean = new BeanWithTwoMethods();
120120
final Method methodOne = bean.getClass().getMethod("methodOne");
@@ -136,9 +136,10 @@ public void duplicateMessageListenerContainsThrowsExceptionOnInitialisation() th
136136
when(applicationContext.getBeanDefinitionNames()).thenReturn(new String[] { "bean" });
137137
when(applicationContext.getBean("bean")).thenReturn(bean);
138138
expectedException.expect(IllegalStateException.class);
139+
defaultQueueContainerService.setApplicationContext(applicationContext);
139140

140141
// act
141-
defaultQueueContainerService.setApplicationContext(applicationContext);
142+
defaultQueueContainerService.startAllContainers();
142143
}
143144

144145
@Test

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/test/java/it/com/jashmore/sqs/container/basic/QueueListenerWrapperIntegrationTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,16 @@
3535
@SpringBootTest(classes = {Application.class, QueueListenerWrapperIntegrationTest.TestConfig.class}, webEnvironment = RANDOM_PORT)
3636
@RunWith(SpringRunner.class)
3737
public class QueueListenerWrapperIntegrationTest {
38+
private static final String QUEUE_NAME = "QueueListenerWrapperIntegrationTest";
39+
3840
private static final int NUMBER_OF_MESSAGES_TO_SEND = 100;
3941
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(NUMBER_OF_MESSAGES_TO_SEND);
4042

4143
private static final Map<String, Boolean> messagesProcessed = new ConcurrentHashMap<>();
4244

4345
@ClassRule
4446
public static final LocalSqsRule LOCAL_SQS_RULE = new LocalSqsRule(ImmutableList.of(
45-
SqsQueuesConfig.QueueConfig.builder().queueName("QueueListenerWrapperIntegrationTest").build()
47+
SqsQueuesConfig.QueueConfig.builder().queueName(QUEUE_NAME).build()
4648
));
4749

4850
@Rule
@@ -53,20 +55,20 @@ public class QueueListenerWrapperIntegrationTest {
5355

5456
@Configuration
5557
public static class TestConfig {
58+
@Bean
59+
public LocalSqsAsyncClient localSqsAsyncClient() {
60+
return LOCAL_SQS_RULE.getLocalAmazonSqsAsync();
61+
}
62+
5663
@Service
5764
public static class MessageListener {
58-
@QueueListener(value = "QueueListenerWrapperIntegrationTest")
65+
@QueueListener(value = QUEUE_NAME)
5966
public void listenToMessage(@Payload final String payload) {
6067
log.info("Obtained message: {}", payload);
6168
messagesProcessed.put(payload, true);
6269
COUNT_DOWN_LATCH.countDown();
6370
}
6471
}
65-
66-
@Bean
67-
public LocalSqsAsyncClient localSqsAsyncClient() {
68-
return LOCAL_SQS_RULE.getLocalAmazonSqsAsync();
69-
}
7072
}
7173

7274
@Test
@@ -75,7 +77,7 @@ public void allMessagesAreProcessedByListeners() throws InterruptedException {
7577
IntStream.range(0, NUMBER_OF_MESSAGES_TO_SEND)
7678
.forEach(i -> {
7779
log.info("Sending message: " + i);
78-
localSqsAsyncClient.sendMessageToLocalQueue("QueueListenerWrapperIntegrationTest", "message: " + i);
80+
localSqsAsyncClient.sendMessageToLocalQueue(QUEUE_NAME, "message: " + i);
7981
});
8082

8183
// act

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/test/java/it/com/jashmore/sqs/container/batching/BatchingQueueListenerWrapperIntegrationTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
2828
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
2929

30-
import java.util.Map;
3130
import java.util.concurrent.CompletableFuture;
32-
import java.util.concurrent.ConcurrentHashMap;
3331
import java.util.concurrent.CountDownLatch;
3432
import java.util.concurrent.ExecutionException;
3533
import java.util.stream.IntStream;

0 commit comments

Comments
 (0)