Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ jobs:
distribution: temurin
java-version: 17
- name: Execute build test jacocoTestReport and sonar analysis
if: endsWith(github.REF, '/master') == true
if: endsWith(github.REF, '/master') == true || github.event.pull_request.head.repo.fork == false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: ./gradlew build test jacocoTestReport sonar --refresh-dependencies --no-daemon --continue -Denv.ci=true
run: ./gradlew clean build generateMergedReport sonar --refresh-dependencies --no-daemon --continue -Denv.ci=true
- name: Execute build test jacocoTestReport pull request
if: endsWith(github.REF, '/merge') == true
if: github.event.pull_request.head.repo.fork == true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: ./gradlew build test jacocoTestReport --refresh-dependencies --no-daemon --continue -Denv.ci=true
run: ./gradlew clean build generateMergedReport --refresh-dependencies --no-daemon --continue -Denv.ci=true
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.reactivecommons.async.commons.config;

import lombok.Getter;

import java.time.Duration;
import java.util.UUID;

@Getter
public class BrokerConfig {
private final String routingKey = UUID.randomUUID().toString().replaceAll("-", "");
private final boolean persistentQueries;
Expand All @@ -24,24 +27,4 @@ public BrokerConfig(boolean persistentQueries, boolean persistentCommands, boole
this.replyTimeout = replyTimeout;
}

public boolean isPersistentQueries() {
return persistentQueries;
}

public boolean isPersistentCommands() {
return persistentCommands;
}

public boolean isPersistentEvents() {
return persistentEvents;
}

public Duration getReplyTimeout() {
return replyTimeout;
}

public String getRoutingKey() {
return routingKey;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.reactivecommons.async.kafka;

import io.cloudevents.CloudEvent;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.api.From;
import reactor.core.publisher.Mono;

public class KafkaDirectAsyncGateway implements DirectAsyncGateway {

public static final String NOT_IMPLEMENTED_YET = "Not implemented yet";

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> reply(T response, From from) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,26 @@

@AllArgsConstructor
public class KafkaDomainEventBus implements DomainEventBus {
public static final String NOT_IMPLEMENTED_YET = "Not implemented yet";
private final ReactiveMessageSender sender;

@Override
public <T> Publisher<Void> emit(DomainEvent<T> event) {
return sender.send(event);
}

@Override
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Publisher<Void> emit(CloudEvent event) {
return sender.send(event);
}

@Override
public Publisher<Void> emit(String domain, CloudEvent event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.reactivecommons.async.rabbit;

import io.cloudevents.CloudEvent;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;

import java.util.Collections;

Expand All @@ -29,7 +29,12 @@ public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, Broke
@Override
public <T> Mono<Void> emit(DomainEvent<T> event) {
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents)
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
}

@Override
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
Expand All @@ -39,4 +44,9 @@ public Publisher<Void> emit(CloudEvent cloudEvent) {
.onErrorMap(err -> new RuntimeException("Event send failure: " + cloudEvent.getType(), err));
}

@Override
public Publisher<Void> emit(String domain, CloudEvent event) {
throw new UnsupportedOperationException("Not implemented yet");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.EventExecutor;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -51,9 +51,6 @@ public ApplicationNotificationListener(ReactiveMessageListener receiver,
}

protected Mono<Void> setUpBindings(TopologyCreator creator) {
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
.type("topic")
.durable(true));

final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(
queue(queueName)
Expand All @@ -65,6 +62,10 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
.flatMap(listener -> creator.bind(binding(exchangeName, listener.getPath(), queueName)));

if (createTopology) {
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
.type("topic")
.durable(true));

return declareExchange
.then(declareQueue)
.thenMany(bindings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
}

private void onTerminate() {
messageFlux.doOnTerminate(this::onTerminate)
messageFlux
.doOnTerminate(this::onTerminate)
.subscribe(new LoggerSubscriber<>(getClass().getName()));
}

Expand Down
9 changes: 0 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@ plugins {
id 'co.com.bancolombia.cleanArchitecture' version '3.17.13'
}

sonar {
properties {
property 'sonar.projectKey', 'reactive-commons_reactive-commons-java'
property 'sonar.coverage.exclusions', 'samples/**/*'
property 'sonar.organization', 'reactive-commons'
property 'sonar.host.url', 'https://sonarcloud.io'
}
}

repositories {
mavenCentral()
}
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/reactive-commons/1-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ spring:

You can also set it in runtime for example from a secret, so you can create the `RabbitProperties` bean like:

```java title="org.reactivecommons.async.rabbit.config.RabbitProperties"
```java title="org.reactivecommons.async.rabbit.standalone.config.RabbitProperties"

@Configuration
public class MyRabbitMQConfig {
Expand Down
8 changes: 7 additions & 1 deletion docs/docs/reactive-commons/9-configuration-properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ app:
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
delayedCommands: false # Enable to send a delayed command to an external target
prefetchCount: 250 # is the maximum number of in flight messages you can reduce it to process less concurrent messages, this settings acts per instance of your service
useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain
enabled: true # if you want to disable this domain you can set it to false
brokerType: "rabbitmq" # please don't change this value
flux:
maxConcurrency: 250 # max concurrency of listener flow
domain:
Expand Down Expand Up @@ -64,7 +67,7 @@ You can override this settings programmatically through a `AsyncPropsDomainPrope
```java
package sample;

import org.reactivecommons.async.rabbit.config.RabbitProperties;
import org.reactivecommons.async.rabbit.standalone.config.RabbitProperties;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncRabbitPropsDomainProperties;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -133,6 +136,9 @@ reactive:
retryDelay: 1000 # interval for message retries, with and without DLQRetry
checkExistingTopics: true # if you don't want to verify topic existence before send a record you can set it to false
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain
enabled: true # if you want to disable this domain you can set it to false
brokerType: "kafka" # please don't change this value
domain:
ignoreThisListener: false # Allows you to disable event listener for this specific domain
connectionProperties: # you can override the connection properties of each domain
Expand Down
Loading
Loading