Skip to content
This repository was archived by the owner on Nov 6, 2023. It is now read-only.

Commit 23ef6ef

Browse files
committed
Refactor the EventBus example polishing out some minor details:
* Use immutable `Set` instead of `List` as the subscribers collection while instantiating the `ReactorEventBus`: We don't need to guarantee the subscribers order (in fact, we should avoid depending on it in order to avoid creating dependencies between subscribers based on execution order) * Rename `VideoCreated` semantics by `VideoPublished` in order to bring more domain related language to the code instead of a CRUDy language * Rename `DomainEvent#domainEventName` to `DomainEvent#fullQualifiedEventName`. This way we are being explicit about using a FQEN naming convention (which has been also introduced in this commit: `vendor.bounded_context.subdomain.[event|command|query].version.resource.event_occured`) * Modify the `DomainEventSubscriber#subscribedTo` method return type from `String` to `Class<EventType extends DomainEvent>`. This way we gain a robuster contract avoiding possible mistakes returning any random string instead of the actual event we want to subscribe to. * This also has allowed us to not having to expose as public the constant with the event name and access it from the event bus implementation. That is, now we're having a really modullar implementation Open/Closed Principle compliant. * Set the FQEN constant as private and only expose as public the `DomainEvent#fullQualifiedEventName` method. This way we ensure by the `DomainEvent` interface that all subclasses will have the needed information in order to deal with them while serializing and so on (previously we were depending on the constant which isn't declared in the interface). I've left this `DomainEvent#fullQualifiedEventName` in order to use it when we publish the events to some distributed message broker. * Renamed the `EventBus#notify` method to `EventBus#publish`, and `DomainEventSubscriber#react` to `DomainEventSubscriber#consume`. This way we're respecting the same semantics we use while talking about Domain Events (subscribing, publishing, and consuming) without being polluted by the semantics of Java Reactor (`react`) * Refactor the `ReactorEventBus` class in order to use `Set#forEach` method and lambda functions extracted into their own methods in order to make it easier to read
1 parent 30fc804 commit 23ef6ef

File tree

8 files changed

+77
-56
lines changed

8 files changed

+77
-56
lines changed
Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
package tv.codely;
22

3-
import tv.codely.context.notification.module.push.application.create.SendPushToSubscribersOnVideoCreated;
4-
import tv.codely.context.video.module.video.domain.VideoCreated;
3+
import tv.codely.context.notification.module.push.application.create.SendPushToSubscribersOnVideoPublished;
4+
import tv.codely.context.video.module.video.domain.VideoPublished;
5+
import tv.codely.shared.application.DomainEventSubscriber;
6+
import tv.codely.shared.domain.EventBus;
57
import tv.codely.shared.infrastructure.bus.ReactorEventBus;
68

7-
import java.util.Arrays;
9+
import java.util.Set;
810

911
public class Starter {
1012
public static void main(String[] args) {
11-
var sendPushToSubscribersOnVideoCreated = new SendPushToSubscribersOnVideoCreated();
13+
final Set<DomainEventSubscriber> subscribers = Set.of(
14+
new SendPushToSubscribersOnVideoPublished()
15+
);
1216

13-
var eventBus = new ReactorEventBus(Arrays.asList(sendPushToSubscribersOnVideoCreated));
17+
final EventBus eventBus = new ReactorEventBus(subscribers);
1418

15-
eventBus.notify(new VideoCreated("Llegamos a 1M de subscribers!", "CodelyTV es una gran plataforma, CREMITA!"));
19+
final var videoPublished = new VideoPublished(
20+
"\uD83C\uDF89 New youtube.com/CodelyTV video title",
21+
"This should be the video description \uD83D\uDE42"
22+
);
23+
24+
eventBus.publish(videoPublished);
1625
}
1726
}
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
package tv.codely.context.notification.module.push.application.create;
22

3-
import tv.codely.context.video.module.video.domain.VideoCreated;
3+
import tv.codely.context.video.module.video.domain.VideoPublished;
44
import tv.codely.shared.application.DomainEventSubscriber;
55

6-
public class SendPushToSubscribersOnVideoCreated implements DomainEventSubscriber<VideoCreated> {
6+
public class SendPushToSubscribersOnVideoPublished implements DomainEventSubscriber<VideoPublished> {
77
@Override
8-
public String subscribedTo() {
9-
return VideoCreated.NAME;
8+
public Class<VideoPublished> subscribedTo() {
9+
return VideoPublished.class;
1010
}
1111

1212
@Override
13-
public void react(VideoCreated event) {
13+
public void consume(VideoPublished event) {
1414
System.out.println(
1515
String.format(
1616
"Hey! There is a new video with title <%s> and description <%s>",
17-
event.name(),
17+
event.title(),
1818
event.description()
1919
)
2020
);

src/main/java/tv/codely/context/video/module/video/domain/VideoCreated.java

Lines changed: 0 additions & 27 deletions
This file was deleted.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package tv.codely.context.video.module.video.domain;
2+
3+
import tv.codely.shared.domain.DomainEvent;
4+
5+
public final class VideoPublished implements DomainEvent {
6+
private static final String FULL_QUALIFIED_EVENT_NAME = "codelytv.video.video.event.1.video.published";
7+
8+
private final String title;
9+
private final String description;
10+
11+
public VideoPublished(String title, String description) {
12+
this.title = title;
13+
this.description = description;
14+
}
15+
16+
public String fullQualifiedEventName() {
17+
return FULL_QUALIFIED_EVENT_NAME;
18+
}
19+
20+
public String title() {
21+
return title;
22+
}
23+
24+
public String description() {
25+
return description;
26+
}
27+
}

src/main/java/tv/codely/shared/application/DomainEventSubscriber.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import tv.codely.shared.domain.DomainEvent;
44

5-
public interface DomainEventSubscriber<Event extends DomainEvent> {
6-
String subscribedTo();
5+
public interface DomainEventSubscriber<EventType extends DomainEvent> {
6+
Class<EventType> subscribedTo();
77

8-
void react(Event event);
8+
void consume(EventType event);
99
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package tv.codely.shared.domain;
22

33
public interface DomainEvent {
4-
String domainEventName();
4+
String fullQualifiedEventName();
55
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package tv.codely.shared.domain;
22

33
public interface EventBus {
4-
void notify(DomainEvent event);
4+
void publish(DomainEvent event);
55
}

src/main/java/tv/codely/shared/infrastructure/bus/ReactorEventBus.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,42 @@
22

33
import reactor.bus.Event;
44
import reactor.bus.EventBus;
5+
import reactor.bus.selector.Selector;
6+
import reactor.fn.Consumer;
57
import tv.codely.shared.application.DomainEventSubscriber;
68
import tv.codely.shared.domain.DomainEvent;
79

8-
import java.util.List;
10+
import java.util.Set;
911

1012
import static reactor.bus.selector.Selectors.$;
1113

1214
public class ReactorEventBus implements tv.codely.shared.domain.EventBus {
1315
private final EventBus bus;
1416

15-
public ReactorEventBus(List<DomainEventSubscriber> subscribers) {
16-
this.bus = EventBus.create();
17+
public ReactorEventBus(final Set<DomainEventSubscriber> subscribers) {
18+
bus = EventBus.create();
1719

18-
for (DomainEventSubscriber subscriber : subscribers)
19-
{
20-
this.bus.on(
21-
$(subscriber.subscribedTo()),
22-
reactorEvent -> subscriber.react((DomainEvent) reactorEvent.getData())
23-
);
24-
}
20+
subscribers.forEach(this::registerOnEventBus);
2521
}
2622

2723
@Override
28-
public void notify(DomainEvent event) {
29-
this.bus.notify(event.domainEventName(), Event.wrap(event));
24+
public void publish(final DomainEvent event) {
25+
Class<? extends DomainEvent> eventIdentifier = event.getClass();
26+
Event<DomainEvent> wrappedEvent = Event.wrap(event);
27+
28+
bus.notify(eventIdentifier, wrappedEvent);
29+
}
30+
31+
private void registerOnEventBus(final DomainEventSubscriber subscriber) {
32+
final Selector eventIdentifier = $(subscriber.subscribedTo());
33+
34+
bus.on(eventIdentifier, eventConsumer(subscriber));
35+
}
36+
37+
private Consumer<Event> eventConsumer(final DomainEventSubscriber subscriber) {
38+
return (Event reactorEvent) -> {
39+
DomainEvent unwrappedEvent = (DomainEvent) reactorEvent.getData();
40+
subscriber.consume(unwrappedEvent);
41+
};
3042
}
3143
}

0 commit comments

Comments
 (0)