fireAsync(T event, String qualifier) {
} catch (Exception e) {
exceptionHandler.handle(e);
}
- }),
+ }, manager.asyncExecutor()),
(future1, future2) -> future1)
.thenApply(v -> {
handleExceptions(exceptionHandler);
diff --git a/inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java b/inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java
index 762a1bdd9..7c9f052b4 100644
--- a/inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java
+++ b/inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java
@@ -2,29 +2,33 @@
import java.lang.reflect.Type;
import java.util.List;
+import java.util.concurrent.Executor;
/**
* Manages all {@link Observer} instances in the BeanScope.
- *
- * A default implementation is provided by avaje-inject.
+ *
+ *
A default implementation is provided by avaje-inject.
*/
public interface ObserverManager {
/**
* Registers the given Consumer as an observer.
*
- * @param the type of the event
+ * @param the type of the event
* @param eventType the type of the event ()
- * @param observer the consumer to execute when a matching event is found
+ * @param observer the consumer to execute when a matching event is found
*/
void registerObserver(Type eventType, Observer observer);
/**
* Retrieves a list of all Observers registered by the given type
*
- * @param the Type of the Event
+ * @param the Type of the Event
* @param eventType the type of the event
* @return all observers registered
*/
List> observersByType(Type eventType);
+
+ /** The Executor used for sending async events */
+ Executor asyncExecutor();
}
diff --git a/inject-events/src/main/java/io/avaje/inject/events/spi/DObserverManager.java b/inject-events/src/main/java/io/avaje/inject/events/spi/DObserverManager.java
index 7c7d0e0e7..9de80b193 100644
--- a/inject-events/src/main/java/io/avaje/inject/events/spi/DObserverManager.java
+++ b/inject-events/src/main/java/io/avaje/inject/events/spi/DObserverManager.java
@@ -5,13 +5,22 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import io.avaje.inject.PostConstruct;
import io.avaje.inject.events.Observer;
import io.avaje.inject.events.ObserverManager;
final class DObserverManager implements ObserverManager {
- private final Map>> observeMap = new HashMap<>();
+ private final Map>> observeMap = new HashMap<>();
+ private Executor executor = ForkJoinPool.commonPool();
+
+ @PostConstruct
+ void post(Executor executor) {
+ this.executor = executor;
+ }
@Override
public void registerObserver(Type type, Observer observer) {
@@ -23,4 +32,9 @@ public void registerObserver(Type type, Observer observer) {
public List> observersByType(Type eventType) {
return observeMap.computeIfAbsent(eventType, k -> new ArrayList<>());
}
+
+ @Override
+ public Executor asyncExecutor() {
+ return executor;
+ }
}
diff --git a/inject-events/src/main/java/io/avaje/inject/events/spi/ObserverManagerPlugin.java b/inject-events/src/main/java/io/avaje/inject/events/spi/ObserverManagerPlugin.java
index d265b3fe3..bc76f7706 100644
--- a/inject-events/src/main/java/io/avaje/inject/events/spi/ObserverManagerPlugin.java
+++ b/inject-events/src/main/java/io/avaje/inject/events/spi/ObserverManagerPlugin.java
@@ -1,5 +1,8 @@
package io.avaje.inject.events.spi;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+
import io.avaje.inject.BeanScopeBuilder;
import io.avaje.inject.spi.InjectPlugin;
@@ -14,5 +17,9 @@ public Class>[] provides() {
@Override
public void apply(BeanScopeBuilder builder) {
builder.provideDefault(null, DObserverManager.class, DObserverManager::new);
+ builder.addPostConstruct(
+ b ->
+ b.get(DObserverManager.class)
+ .post(b.getOptional(ExecutorService.class).orElse(ForkJoinPool.commonPool())));
}
}
diff --git a/inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java b/inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java
index 6c878b1af..366b6b639 100644
--- a/inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java
+++ b/inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java
@@ -6,20 +6,19 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import io.avaje.inject.events.Observer;
-import io.avaje.inject.events.ObserverManager;
import io.avaje.inject.events.events.TestEvent;
import io.avaje.inject.events.events.TestGenericEvent;
import io.avaje.inject.spi.GenericType;
class DObserverManagerTest {
- ObserverManager manager = new DObserverManager();
+ DObserverManager manager = new DObserverManager();
@Test
void test() {
@@ -36,7 +35,7 @@ void test() {
}
@Test
- void testPriority() throws InterruptedException, ExecutionException {
+ void testPriority() {
var l = new ArrayList();
manager.registerObserver(String.class, new Observer<>(0, false, s -> l.add("1"), ""));
@@ -48,42 +47,55 @@ void testPriority() throws InterruptedException, ExecutionException {
}
@Test
- void testAsync() throws InterruptedException, ExecutionException {
+ void testAsync() {
AtomicBoolean aBoolean = new AtomicBoolean();
manager.registerObserver(
String.class, new Observer<>(0, true, s -> aBoolean.set(true), ""));
- new TestEvent(manager).fireAsync("str").toCompletableFuture().get();
+ new TestEvent(manager).fireAsync("str").toCompletableFuture().join();
assertThat(aBoolean.get()).isTrue();
}
@Test
- void testAsyncPriority() throws InterruptedException, ExecutionException {
+ void testAsyncExecutor() {
+ AtomicBoolean aBoolean = new AtomicBoolean();
+ var exec = Executors.newSingleThreadScheduledExecutor();
+ manager.post(exec);
+ assertThat(exec).isSameAs(manager.asyncExecutor());
+ manager.registerObserver(
+ String.class, new Observer<>(0, true, s -> aBoolean.set(true), ""));
+
+ new TestEvent(manager).fireAsync("str").toCompletableFuture().join();
+ assertThat(aBoolean.get()).isTrue();
+ }
+
+ @Test
+ void testAsyncPriority() {
var l = new ArrayList();
manager.registerObserver(String.class, new Observer<>(0, true, s -> l.add("1"), ""));
manager.registerObserver(String.class, new Observer<>(5, true, s -> l.add("5"), ""));
manager.registerObserver(String.class, new Observer<>(2, true, s -> l.add("2"), ""));
- new TestEvent(manager).fireAsync("str").toCompletableFuture().get();
+ new TestEvent(manager).fireAsync("str").toCompletableFuture().join();
assertThat(l).containsExactly("1", "2", "5");
}
@Test
- void testGenericAsync() throws InterruptedException, ExecutionException {
+ void testGenericAsync() {
AtomicBoolean aBoolean = new AtomicBoolean();
manager.>registerObserver(
new GenericType>() {}.type(),
new Observer<>(0, true, s -> aBoolean.set(true), ""));
- new TestGenericEvent(manager).fireAsync(List.of("str")).toCompletableFuture().get();
+ new TestGenericEvent(manager).fireAsync(List.of("str")).toCompletableFuture().join();
assertThat(aBoolean.get()).isTrue();
}
@Test
- void testError() throws InterruptedException {
+ void testError() {
final var testEvent = new TestEvent(manager);
testEvent.fire("sus");
@@ -102,7 +114,7 @@ void testError() throws InterruptedException {
}
@Test
- void testAsyncError() throws InterruptedException, ExecutionException {
+ void testAsyncError() {
final var testEvent = new TestEvent(manager);
manager.registerObserver(