Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static reactor.core.publisher.Flux.range;
Expand All @@ -32,8 +29,6 @@ class QueryProcessPerfTest {
private static final String QUERY_NAME = "app.command.test";
private static final int MESSAGE_COUNT = 40000;
private static final Semaphore semaphore = new Semaphore(0);
private static final AtomicLong atomicLong = new AtomicLong(0);
private static final CountDownLatch latch = new CountDownLatch(12 + 1);

@Autowired
private DirectAsyncGateway gateway;
Expand Down Expand Up @@ -75,7 +70,7 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro
private Flux<AsyncQuery<DummyMessage>> createMessages(int count) {
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count)
.mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage()))
.collect(Collectors.toList());
.toList();
return Flux.fromIterable(queryList);
}

Expand All @@ -95,6 +90,7 @@ public HandlerRegistry registry() {
"app.command.name" + i, message -> Mono.empty(), Map.class
))
.block();
assert registry != null;
return registry
.serveQuery(QUERY_NAME, this::handleSimple, DummyMessage.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
import org.reactivecommons.async.api.handlers.RawCommandHandler;
import org.reactivecommons.async.api.handlers.RawEventHandler;
import org.reactivecommons.async.api.handlers.TopologyHandlerSetup;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredDomainHandlers;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
import reactor.core.publisher.Mono;

import java.lang.reflect.ParameterizedType;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -35,6 +38,8 @@ public final class HandlerRegistry {
private final RegisteredDomainHandlers<RegisteredQueryHandler<?, ?>> handlers = new RegisteredDomainHandlers<>();
private final RegisteredDomainHandlers<RegisteredCommandHandler<?, ?>> commandHandlers =
new RegisteredDomainHandlers<>();
private final RegisteredDomainHandlers<RegisteredQueueListener> queueHandlers =
new RegisteredDomainHandlers<>();


public static HandlerRegistry register() {
Expand Down Expand Up @@ -142,12 +147,12 @@ public HandlerRegistry handleCloudEventCommand(String domain, String commandName
}

// commands: RawMessage
public HandlerRegistry handleRawCommand(String commandName, RawCommandHandler<?> handler) {
return handleRawCommand(DEFAULT_DOMAIN, commandName, handler);
public HandlerRegistry handleRawCommand(RawCommandHandler<?> handler) {
return handleRawCommand(DEFAULT_DOMAIN, handler);
}

public HandlerRegistry handleRawCommand(String domain, String commandName, RawCommandHandler<?> handler) {
commandHandlers.add(domain, new RegisteredCommandHandler<>(commandName, handler, RawMessage.class));
public HandlerRegistry handleRawCommand(String domain, RawCommandHandler<?> handler) {
commandHandlers.add(domain, new RegisteredCommandHandler<>("", handler, RawMessage.class));
return this;
}

Expand Down Expand Up @@ -176,6 +181,26 @@ public HandlerRegistry serveCloudEventQuery(String resource, QueryHandlerDelegat
return this;
}

// Queues
public HandlerRegistry listenQueue(String queueName, RawEventHandler<RawMessage> handler) {
return listenQueue(queueName, handler, creator -> Mono.empty());
}

public HandlerRegistry listenQueue(String domain, String queueName, RawEventHandler<RawMessage> handler) {
return listenQueue(domain, queueName, handler, creator -> Mono.empty());
}

public HandlerRegistry listenQueue(String queueName, RawEventHandler<RawMessage> handler,
TopologyHandlerSetup topologyCreator) {
queueHandlers.add(DEFAULT_DOMAIN, new RegisteredQueueListener(queueName, handler, topologyCreator));
return this;
}

public HandlerRegistry listenQueue(String domain, String queueName, RawEventHandler<RawMessage> handler,
TopologyHandlerSetup topologyCreator) {
queueHandlers.add(domain, new RegisteredQueueListener(queueName, handler, topologyCreator));
return this;
}

@Deprecated(forRemoval = true)
public <T> HandlerRegistry listenEvent(String eventName, DomainEventHandler<T> handler) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.reactivecommons.async.api.handlers;

import reactor.core.publisher.Mono;

/**
* Interface for setting up topology handlers.
*/
@FunctionalInterface
public interface TopologyHandlerSetup {
Mono<Void> setup(Object topologyCreator);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.reactivecommons.async.api.handlers.registered;

import org.reactivecommons.api.domain.RawMessage;
import org.reactivecommons.async.api.handlers.RawEventHandler;
import org.reactivecommons.async.api.handlers.TopologyHandlerSetup;


public record RegisteredQueueListener(String queueName,
RawEventHandler<RawMessage> handler,
TopologyHandlerSetup topologyHandlerSetup) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
class HandlerRegistryTest {
private final HandlerRegistry registry = HandlerRegistry.register();
private final String name = "some.event";
private final String nameRaw = "some.raw.event";
private final String nameRawNotification = "some.raw.notification.event";
private final String domain = "some-domain";

Expand Down Expand Up @@ -222,14 +221,14 @@ void handleCloudEventCommand() {
void handleRawCommand() {
SomeRawCommandEventHandler eventHandler = new SomeRawCommandEventHandler();

registry.handleRawCommand(nameRaw, eventHandler);
registry.handleRawCommand(eventHandler);

assertThat(registry.getCommandHandlers().get(DEFAULT_DOMAIN))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredCommandHandler::path, RegisteredCommandHandler::inputClass,
RegisteredCommandHandler::handler
)
.containsExactly(nameRaw, RawMessage.class, eventHandler)).hasSize(1);
.containsExactly("", RawMessage.class, eventHandler)).hasSize(1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.reactivecommons.async.commons;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
import org.reactivecommons.async.commons.utils.matcher.KeyMatcher;
import org.reactivecommons.async.commons.utils.matcher.Matcher;

Expand All @@ -23,6 +25,8 @@ public class HandlerResolver {
private final Map<String, RegisteredEventListener<?, ?>> eventsToBind;
private final Map<String, RegisteredEventListener<?, ?>> eventNotificationListeners;
private final Map<String, RegisteredCommandHandler<?, ?>> commandHandlers;
@Getter
private final Map<String, RegisteredQueueListener> queueListeners;

private final Matcher matcher = new KeyMatcher();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
import org.reactivecommons.async.commons.HandlerResolver;

import java.util.List;
Expand Down Expand Up @@ -42,7 +43,17 @@ public static HandlerResolver buildResolver(String domain, Map<String, HandlerRe
-> map.put(handler.path(), handler), ConcurrentHashMap::putAll
);

final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventNotificationListener = registries
final ConcurrentMap<String, RegisteredQueueListener> queueListeners = registries
.values()
.stream()
.flatMap(r -> r.getQueueHandlers()
.getOrDefault(domain, List.of())
.stream())
.collect(ConcurrentHashMap::new, (map, handler)
-> map.put(handler.queueName(), handler), ConcurrentHashMap::putAll
);

final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventNotificationListeners = registries
.values()
.stream()
.flatMap(r -> r.getEventNotificationListener()
Expand All @@ -55,11 +66,11 @@ public static HandlerResolver buildResolver(String domain, Map<String, HandlerRe
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain,
registries);

final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers =
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventListeners =
getEventHandlersWithDynamics(domain, registries);

return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener,
commandHandlers) {
return new HandlerResolver(queryHandlers, eventListeners, eventsToBind, eventNotificationListeners,
commandHandlers, queueListeners) {
@Override
@SuppressWarnings("unchecked")
public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static class RabbitMessageProperties implements Properties {
}

public static RabbitMessage fromDelivery(Delivery delivery) {
return fromDelivery(delivery, null);
return fromDelivery(delivery, "");
}

public static RabbitMessage fromDelivery(Delivery delivery, String executorPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.extern.java.Log;
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
import org.reactivecommons.async.api.handlers.RawCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.commons.CommandExecutor;
import org.reactivecommons.async.commons.DiscardNotifier;
Expand Down Expand Up @@ -63,6 +64,7 @@ public ApplicationCommandListener(ReactiveMessageListener listener,
this.maxLengthBytes = maxLengthBytes;
}

@Override
protected Mono<Void> setUpBindings(TopologyCreator creator) {
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(
ExchangeSpecification.exchange(directExchange).durable(true).type("direct")
Expand Down Expand Up @@ -128,7 +130,10 @@ protected String getExecutorPath(AcknowledgableDelivery msj) {
if (jsonNode.get(COMMAND_ID) != null) {
return jsonNode.get(NAME).asText();
}
return jsonNode.get(TYPE).asText();
if (jsonNode.get(TYPE) != null) {
return jsonNode.get(TYPE).asText();
}
return rabbitMessage.getType();
}

@Override
Expand All @@ -142,6 +147,8 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredCommandHandl
return msj -> messageConverter.readCommand(msj, commandClass);
} else if (registeredCommandHandler.handler() instanceof CloudCommandHandler) {
return messageConverter::readCloudEvent;
} else if (registeredCommandHandler.handler() instanceof RawCommandHandler) {
return message -> message;
}
throw new RuntimeException("Unknown handler type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public ApplicationEventListener(ReactiveMessageListener receiver,
this.appName = appName;
}

@Override
protected Mono<Void> setUpBindings(TopologyCreator creator) {
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(
ExchangeSpecification.exchange(eventsExchange).durable(true).type("topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public ApplicationNotificationListener(ReactiveMessageListener receiver,
this.createTopology = createTopology;
}

@Override
protected Mono<Void> setUpBindings(TopologyCreator creator) {

final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.reactivecommons.async.rabbit.listeners;

import lombok.extern.java.Log;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;

import java.util.function.Function;

@Log
public class ApplicationQueueListener extends GenericMessageListener {

private final RegisteredQueueListener registeredListener;

public ApplicationQueueListener(ReactiveMessageListener listener,
boolean withDLQRetry,
long maxRetries,
int retryDelay,
RegisteredQueueListener registeredListener,
DiscardNotifier discardNotifier,
CustomReporter errorReporter) {
super(registeredListener.queueName(), listener, withDLQRetry, true, maxRetries, retryDelay,
discardNotifier, "queue", errorReporter);
this.registeredListener = registeredListener;
}

@Override
protected Mono<Void> setUpBindings(TopologyCreator creator) {
return registeredListener.topologyHandlerSetup().setup(creator);
}

@Override
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
return message -> registeredListener.handler().handle(message).cast(Object.class);
}

protected String getExecutorPath(AcknowledgableDelivery msj) {
return registeredListener.queueName();
}

@Override
protected Object parseMessageForReporter(Message message) {
return message;
}

@Override
protected String getKind() {
return "queue";
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.java.Log;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.FallbackStrategy;
Expand Down Expand Up @@ -101,16 +100,25 @@ public void startListener() {
if (createTopology) {
this.messageFlux = setUpBindings(messageListener.getTopologyCreator())
.thenMany(receiver.consumeManualAck(queueName, consumeOptions)
.doOnError(err -> log.log(Level.SEVERE, "Error listening queue " + getRootCauseMessage(err), err))
.transform(this::consumeFaultTolerant));
} else {
this.messageFlux = receiver.consumeManualAck(queueName, consumeOptions)
.doOnError(err -> log.log(Level.SEVERE, "Error listening queue", err))
.doOnError(err -> log.log(Level.SEVERE, "Error listening queue " + getRootCauseMessage(err), err))
.transform(this::consumeFaultTolerant);
}

onTerminate();
}

private String getRootCauseMessage(Throwable err) {
Throwable root = err;
while (root.getCause() != null && root.getCause() != root) {
root = root.getCause();
}
return root.getMessage();
}

private Flux<AcknowledgableDelivery> consumeFaultTolerant(Flux<AcknowledgableDelivery> messageFlux) {
return messageFlux.flatMap(msj -> {
final Instant init = Instant.now();
Expand Down Expand Up @@ -180,7 +188,6 @@ private void doLogExecution(String executorPath, long timeElapsed) {
objectType, executorPath, timeElapsed));
}


protected void logError(Throwable err, AcknowledgableDelivery msj, FallbackStrategy strategy) {
String messageID = msj.getProperties().getMessageId();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
Expand Down Expand Up @@ -51,7 +52,10 @@ void setUp() {
Map<String, RegisteredEventListener<?,?>> eventsToBind = new ConcurrentHashMap<>();
Map<String, RegisteredEventListener<?,?>> notificationEventListeners = new ConcurrentHashMap<>();
Map<String, RegisteredQueryHandler<?, ?>> queryHandlers = new ConcurrentHashMap<>();
resolver = new HandlerResolver(queryHandlers, eventListeners, eventsToBind, notificationEventListeners, commandHandlers);
Map<String, RegisteredQueueListener> queueHandlers = new ConcurrentHashMap<>();
resolver = new HandlerResolver(
queryHandlers, eventListeners, eventsToBind, notificationEventListeners, commandHandlers, queueHandlers
);
dynamicRegistry = new DynamicRegistryImp(resolver, topologyCreator, props);
}

Expand Down
Loading