diff --git a/inject-events/pom.xml b/inject-events/pom.xml index d81eae7f9..4cc3e4db3 100644 --- a/inject-events/pom.xml +++ b/inject-events/pom.xml @@ -12,7 +12,7 @@ io.avaje avaje-inject - 10.0 + 12.0 provided diff --git a/inject-events/src/main/java/io/avaje/inject/events/Event.java b/inject-events/src/main/java/io/avaje/inject/events/Event.java index ee8cf7a82..62c5518ad 100644 --- a/inject-events/src/main/java/io/avaje/inject/events/Event.java +++ b/inject-events/src/main/java/io/avaje/inject/events/Event.java @@ -15,20 +15,18 @@ * injected: * *
{@code
- *
- *   @Inject
- *   Event loggedInEvent;
+ * @Inject
+ * Event loggedInEvent;
  *
  * }
* *

The fire() method accepts an event object: * *

{@code
- *
- *   public void login() {
- *     ...
- *     loggedInEvent.fire(new LoggedInEvent(user));
- *   }
+ * public void login() {
+ *   ...
+ *   loggedInEvent.fire(new LoggedInEvent(user));
+ * }
  *
  * }
* @@ -37,7 +35,7 @@ public abstract class Event { private static final Comparator> PRIORITY = Comparator.comparing(Observer::priority); - + protected final ObserverManager manager; protected final List> observers; protected final String defaultQualifier; @@ -46,6 +44,7 @@ protected Event(ObserverManager manager, Type type) { } protected Event(ObserverManager manager, Type type, String qualifier) { + this.manager = manager; this.observers = manager.observersByType(type); this.defaultQualifier = qualifier; } @@ -72,7 +71,6 @@ public void fire(T event, String qualifier) { */ public CompletionStage fireAsync(T event, String qualifier) { var exceptionHandler = new CollectingExceptionHandler(); - return observers.stream() .sorted(PRIORITY) .reduce(CompletableFuture.completedFuture(null), (future, observer) -> @@ -82,7 +80,7 @@ public CompletionStage fireAsync(T event, String qualifier) { } catch (Exception e) { exceptionHandler.handle(e); } - }), + }, manager.asyncExecutor()), (future1, future2) -> future1) .thenApply(v -> { handleExceptions(exceptionHandler); diff --git a/inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java b/inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java index 762a1bdd9..7c9f052b4 100644 --- a/inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java +++ b/inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java @@ -2,29 +2,33 @@ import java.lang.reflect.Type; import java.util.List; +import java.util.concurrent.Executor; /** * Manages all {@link Observer} instances in the BeanScope. - *

- * A default implementation is provided by avaje-inject. + * + *

A default implementation is provided by avaje-inject. */ public interface ObserverManager { /** * Registers the given Consumer as an observer. * - * @param the type of the event + * @param the type of the event * @param eventType the type of the event () - * @param observer the consumer to execute when a matching event is found + * @param observer the consumer to execute when a matching event is found */ void registerObserver(Type eventType, Observer observer); /** * Retrieves a list of all Observers registered by the given type * - * @param the Type of the Event + * @param the Type of the Event * @param eventType the type of the event * @return all observers registered */ List> observersByType(Type eventType); + + /** The Executor used for sending async events */ + Executor asyncExecutor(); } diff --git a/inject-events/src/main/java/io/avaje/inject/events/spi/DObserverManager.java b/inject-events/src/main/java/io/avaje/inject/events/spi/DObserverManager.java index 7c7d0e0e7..9de80b193 100644 --- a/inject-events/src/main/java/io/avaje/inject/events/spi/DObserverManager.java +++ b/inject-events/src/main/java/io/avaje/inject/events/spi/DObserverManager.java @@ -5,13 +5,22 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import io.avaje.inject.PostConstruct; import io.avaje.inject.events.Observer; import io.avaje.inject.events.ObserverManager; final class DObserverManager implements ObserverManager { - private final Map>> observeMap = new HashMap<>(); + private final Map>> observeMap = new HashMap<>(); + private Executor executor = ForkJoinPool.commonPool(); + + @PostConstruct + void post(Executor executor) { + this.executor = executor; + } @Override public void registerObserver(Type type, Observer observer) { @@ -23,4 +32,9 @@ public void registerObserver(Type type, Observer observer) { public List> observersByType(Type eventType) { return observeMap.computeIfAbsent(eventType, k -> new ArrayList<>()); } + + @Override + public Executor asyncExecutor() { + return executor; + } } diff --git a/inject-events/src/main/java/io/avaje/inject/events/spi/ObserverManagerPlugin.java b/inject-events/src/main/java/io/avaje/inject/events/spi/ObserverManagerPlugin.java index d265b3fe3..bc76f7706 100644 --- a/inject-events/src/main/java/io/avaje/inject/events/spi/ObserverManagerPlugin.java +++ b/inject-events/src/main/java/io/avaje/inject/events/spi/ObserverManagerPlugin.java @@ -1,5 +1,8 @@ package io.avaje.inject.events.spi; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; + import io.avaje.inject.BeanScopeBuilder; import io.avaje.inject.spi.InjectPlugin; @@ -14,5 +17,9 @@ public Class[] provides() { @Override public void apply(BeanScopeBuilder builder) { builder.provideDefault(null, DObserverManager.class, DObserverManager::new); + builder.addPostConstruct( + b -> + b.get(DObserverManager.class) + .post(b.getOptional(ExecutorService.class).orElse(ForkJoinPool.commonPool()))); } } diff --git a/inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java b/inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java index 6c878b1af..366b6b639 100644 --- a/inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java +++ b/inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java @@ -6,20 +6,19 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import io.avaje.inject.events.Observer; -import io.avaje.inject.events.ObserverManager; import io.avaje.inject.events.events.TestEvent; import io.avaje.inject.events.events.TestGenericEvent; import io.avaje.inject.spi.GenericType; class DObserverManagerTest { - ObserverManager manager = new DObserverManager(); + DObserverManager manager = new DObserverManager(); @Test void test() { @@ -36,7 +35,7 @@ void test() { } @Test - void testPriority() throws InterruptedException, ExecutionException { + void testPriority() { var l = new ArrayList(); manager.registerObserver(String.class, new Observer<>(0, false, s -> l.add("1"), "")); @@ -48,42 +47,55 @@ void testPriority() throws InterruptedException, ExecutionException { } @Test - void testAsync() throws InterruptedException, ExecutionException { + void testAsync() { AtomicBoolean aBoolean = new AtomicBoolean(); manager.registerObserver( String.class, new Observer<>(0, true, s -> aBoolean.set(true), "")); - new TestEvent(manager).fireAsync("str").toCompletableFuture().get(); + new TestEvent(manager).fireAsync("str").toCompletableFuture().join(); assertThat(aBoolean.get()).isTrue(); } @Test - void testAsyncPriority() throws InterruptedException, ExecutionException { + void testAsyncExecutor() { + AtomicBoolean aBoolean = new AtomicBoolean(); + var exec = Executors.newSingleThreadScheduledExecutor(); + manager.post(exec); + assertThat(exec).isSameAs(manager.asyncExecutor()); + manager.registerObserver( + String.class, new Observer<>(0, true, s -> aBoolean.set(true), "")); + + new TestEvent(manager).fireAsync("str").toCompletableFuture().join(); + assertThat(aBoolean.get()).isTrue(); + } + + @Test + void testAsyncPriority() { var l = new ArrayList(); manager.registerObserver(String.class, new Observer<>(0, true, s -> l.add("1"), "")); manager.registerObserver(String.class, new Observer<>(5, true, s -> l.add("5"), "")); manager.registerObserver(String.class, new Observer<>(2, true, s -> l.add("2"), "")); - new TestEvent(manager).fireAsync("str").toCompletableFuture().get(); + new TestEvent(manager).fireAsync("str").toCompletableFuture().join(); assertThat(l).containsExactly("1", "2", "5"); } @Test - void testGenericAsync() throws InterruptedException, ExecutionException { + void testGenericAsync() { AtomicBoolean aBoolean = new AtomicBoolean(); manager.>registerObserver( new GenericType>() {}.type(), new Observer<>(0, true, s -> aBoolean.set(true), "")); - new TestGenericEvent(manager).fireAsync(List.of("str")).toCompletableFuture().get(); + new TestGenericEvent(manager).fireAsync(List.of("str")).toCompletableFuture().join(); assertThat(aBoolean.get()).isTrue(); } @Test - void testError() throws InterruptedException { + void testError() { final var testEvent = new TestEvent(manager); testEvent.fire("sus"); @@ -102,7 +114,7 @@ void testError() throws InterruptedException { } @Test - void testAsyncError() throws InterruptedException, ExecutionException { + void testAsyncError() { final var testEvent = new TestEvent(manager); manager.registerObserver(