11package org .reactivecommons .async .impl .config ;
22
33import lombok .RequiredArgsConstructor ;
4+ import org .reactivecommons .async .api .DefaultCommandHandler ;
5+ import org .reactivecommons .async .api .DefaultQueryHandler ;
6+ import org .reactivecommons .async .api .DynamicRegistry ;
7+ import org .reactivecommons .async .api .HandlerRegistry ;
8+ import org .reactivecommons .async .api .handlers .registered .RegisteredCommandHandler ;
9+ import org .reactivecommons .async .api .handlers .registered .RegisteredEventListener ;
10+ import org .reactivecommons .async .api .handlers .registered .RegisteredQueryHandler ;
411import org .reactivecommons .async .impl .DiscardNotifier ;
12+ import org .reactivecommons .async .impl .DynamicRegistryImp ;
513import org .reactivecommons .async .impl .HandlerResolver ;
614import org .reactivecommons .async .impl .communications .ReactiveMessageListener ;
15+ import org .reactivecommons .async .impl .communications .ReactiveMessageSender ;
716import org .reactivecommons .async .impl .config .props .AsyncProps ;
817import org .reactivecommons .async .impl .converters .MessageConverter ;
18+ import org .reactivecommons .async .impl .listeners .ApplicationCommandListener ;
919import org .reactivecommons .async .impl .listeners .ApplicationEventListener ;
20+ import org .reactivecommons .async .impl .listeners .ApplicationNotificationListener ;
21+ import org .reactivecommons .async .impl .listeners .ApplicationQueryListener ;
1022import org .springframework .beans .factory .annotation .Value ;
23+ import org .springframework .boot .autoconfigure .condition .ConditionalOnMissingBean ;
24+ import org .springframework .context .ApplicationContext ;
1125import org .springframework .context .annotation .Bean ;
1226import org .springframework .context .annotation .Configuration ;
1327import org .springframework .context .annotation .Import ;
28+ import reactor .core .publisher .Mono ;
29+
30+ import java .util .Map ;
31+ import java .util .concurrent .ConcurrentHashMap ;
32+ import java .util .concurrent .ConcurrentMap ;
1433
1534@ Configuration
1635@ RequiredArgsConstructor
36+ @ Deprecated
1737@ Import (RabbitMqConfig .class )
1838public class MessageListenersConfig {
1939
@@ -28,11 +48,103 @@ public ApplicationEventListener eventListener(HandlerResolver resolver, MessageC
2848 ReactiveMessageListener receiver , DiscardNotifier discardNotifier ) {
2949 final ApplicationEventListener listener = new ApplicationEventListener (receiver ,
3050 appName + ".subsEvents" , resolver , asyncProps .getDomain ().getEvents ().getExchange (),
31- messageConverter , asyncProps .getWithDLQRetry (), asyncProps .getMaxRetries (), asyncProps .getRetryDelay (),asyncProps .getDomain ().getEvents ().getMaxLengthBytes (),
32- discardNotifier );
51+ messageConverter , asyncProps .getWithDLQRetry (), asyncProps .getMaxRetries (), asyncProps .getRetryDelay (),
52+ asyncProps .getDomain ().getEvents ().getMaxLengthBytes (), discardNotifier );
53+ listener .startListener ();
54+ return listener ;
55+ }
3356
57+ @ Bean
58+ public ApplicationNotificationListener eventNotificationListener (HandlerResolver resolver , MessageConverter messageConverter ,
59+ ReactiveMessageListener receiver , DiscardNotifier discardNotifier ) {
60+ final ApplicationNotificationListener listener = new ApplicationNotificationListener (
61+ receiver ,
62+ asyncProps .getDomain ().getEvents ().getExchange (),
63+ asyncProps .getNotificationProps ().getQueueName (appName ),
64+ resolver ,
65+ messageConverter ,
66+ discardNotifier );
3467 listener .startListener ();
68+ return listener ;
69+ }
3570
71+ @ Bean //TODO: move to own config (QueryListenerConfig)
72+ public ApplicationQueryListener queryListener (MessageConverter converter , HandlerResolver resolver ,
73+ ReactiveMessageSender sender , ReactiveMessageListener rlistener ,
74+ DiscardNotifier discardNotifier ) {
75+ final ApplicationQueryListener listener = new ApplicationQueryListener (rlistener ,
76+ appName + ".query" , resolver , sender , asyncProps .getDirect ().getExchange (), converter ,
77+ asyncProps .getGlobal ().getExchange (), asyncProps .getWithDLQRetry (), asyncProps .getMaxRetries (),
78+ asyncProps .getRetryDelay (), asyncProps .getGlobal ().getMaxLengthBytes (), discardNotifier );
79+ listener .startListener ();
3680 return listener ;
3781 }
82+
83+ @ Bean
84+ public ApplicationCommandListener applicationCommandListener (ReactiveMessageListener listener ,
85+ HandlerResolver resolver , MessageConverter converter ,
86+ DiscardNotifier discardNotifier ) {
87+ ApplicationCommandListener commandListener = new ApplicationCommandListener (listener , appName , resolver ,
88+ asyncProps .getDirect ().getExchange (), converter , asyncProps .getWithDLQRetry (), asyncProps .getMaxRetries (),
89+ asyncProps .getRetryDelay (), asyncProps .getDirect ().getMaxLengthBytes (), discardNotifier );
90+ commandListener .startListener ();
91+ return commandListener ;
92+ }
93+
94+ @ Bean
95+ public DynamicRegistry dynamicRegistry (HandlerResolver resolver , ReactiveMessageListener listener , IBrokerConfigProps props ) {
96+ return new DynamicRegistryImp (resolver , listener .getTopologyCreator (), props );
97+ }
98+
99+ @ Bean
100+ public HandlerResolver resolver (ApplicationContext context , DefaultCommandHandler defaultCommandHandler ) {
101+ final Map <String , HandlerRegistry > registries = context .getBeansOfType (HandlerRegistry .class );
102+
103+ final ConcurrentMap <String , RegisteredQueryHandler > handlers = registries
104+ .values ().stream ()
105+ .flatMap (r -> r .getHandlers ().stream ())
106+ .collect (ConcurrentHashMap ::new , (map , handler ) -> map .put (handler .getPath (), handler ),
107+ ConcurrentHashMap ::putAll );
108+
109+ final ConcurrentMap <String , RegisteredEventListener > eventListeners = registries
110+ .values ().stream ()
111+ .flatMap (r -> r .getEventListeners ().stream ())
112+ .collect (ConcurrentHashMap ::new , (map , handler ) -> map .put (handler .getPath (), handler ),
113+ ConcurrentHashMap ::putAll );
114+
115+ final ConcurrentMap <String , RegisteredCommandHandler > commandHandlers = registries
116+ .values ().stream ()
117+ .flatMap (r -> r .getCommandHandlers ().stream ())
118+ .collect (ConcurrentHashMap ::new , (map , handler ) -> map .put (handler .getPath (), handler ),
119+ ConcurrentHashMap ::putAll );
120+
121+ final ConcurrentMap <String , RegisteredEventListener > eventNotificationListener = registries
122+ .values ()
123+ .stream ()
124+ .flatMap (r -> r .getEventNotificationListener ().stream ())
125+ .collect (ConcurrentHashMap ::new , (map , handler ) -> map .put (handler .getPath (), handler ),
126+ ConcurrentHashMap ::putAll );
127+
128+ return new HandlerResolver (handlers , eventListeners , commandHandlers , eventNotificationListener ) {
129+ @ Override
130+ @ SuppressWarnings ("unchecked" )
131+ public <T > RegisteredCommandHandler <T > getCommandHandler (String path ) {
132+ final RegisteredCommandHandler <T > handler = super .getCommandHandler (path );
133+ return handler != null ? handler : new RegisteredCommandHandler <>("" , defaultCommandHandler , Object .class );
134+ }
135+ };
136+ }
137+
138+ @ Bean
139+ @ ConditionalOnMissingBean
140+ public DefaultQueryHandler defaultHandler () {
141+ return (DefaultQueryHandler <Object , Object >) command ->
142+ Mono .error (new RuntimeException ("No Handler Registered" ));
143+ }
144+
145+ @ Bean
146+ @ ConditionalOnMissingBean
147+ public DefaultCommandHandler defaultCommandHandler () {
148+ return message -> Mono .error (new RuntimeException ("No Handler Registered" ));
149+ }
38150}
0 commit comments