Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.starter.config.health.RCHealth;
import org.reactivecommons.async.starter.props.GenericAsyncProps;
import org.springframework.boot.actuate.health.Health;
import reactor.core.publisher.Mono;

@SuppressWarnings("rawtypes")
Expand All @@ -25,5 +25,5 @@ public interface BrokerProvider<T extends GenericAsyncProps> {

void listenReplies(HandlerResolver resolver);

Mono<Health> healthCheck();
Mono<RCHealth> healthCheck();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.reactivecommons.async.starter.config.health;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

import java.util.HashMap;
import java.util.Map;

@Getter
@Builder
@AllArgsConstructor
public class RCHealth {
private final Status status;
private final Map<String, Object> details;

public enum Status {
UP,
DOWN
}

public static class RCHealthBuilder {
public RCHealthBuilder() {
this.details = new HashMap<>();
}

public RCHealthBuilder up() {
this.status = Status.UP;
return this;
}

public RCHealthBuilder down() {
this.status = Status.DOWN;
return this;
}

public RCHealthBuilder withDetail(String key, Object value) {
this.details.put(key, value);
return this;
}

public RCHealth build() {
return new RCHealth(this.status, this.details);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.reactivecommons.async.starter.config.health;

import reactor.core.publisher.Mono;

public abstract class RCHealthIndicator {

public Mono<RCHealth> health() {
return doHealthCheck(RCHealth.builder())
.onErrorResume(e -> Mono.just(RCHealth.builder().down().withDetail("error", e.getMessage()).build()));
}

public abstract Mono<RCHealth> doHealthCheck(RCHealth.RCHealthBuilder builder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnClass(AbstractReactiveHealthIndicator.class)
public class ReactiveCommonsHealthConfig {

@Bean
@ConditionalOnProperty(prefix = "management.health.reactive-commons", name = "enabled", havingValue = "true",
matchIfMissing = true)
@ConditionalOnClass(AbstractReactiveHealthIndicator.class)
public ReactiveCommonsHealthIndicator reactiveCommonsHealthIndicator(ConnectionManager manager) {
return new ReactiveCommonsHealthIndicator(manager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.reactivecommons.async.starter.config.ConnectionManager;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.reactivecommons.async.starter.config.ConnectionManager;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -22,14 +21,14 @@ public class ReactiveCommonsHealthIndicator extends AbstractReactiveHealthIndica
protected Mono<Health> doHealthCheck(Health.Builder builder) {
return Flux.fromIterable(manager.getProviders().values())
.flatMap(BrokerProvider::healthCheck)
.reduceWith(Health::up, (health, status) -> reduceHealth((Health.Builder) health, (Health) status))
.reduceWith(Health::up, (health, status) -> reduceHealth((Health.Builder) health, (RCHealth) status))
.map(b -> ((Health.Builder) b).build());

}

private Health.Builder reduceHealth(Health.Builder builder, Health status) {
private Health.Builder reduceHealth(Health.Builder builder, RCHealth status) {
String domain = status.getDetails().get(DOMAIN).toString();
if (status.getStatus().equals(Status.DOWN)) {
if (status.getStatus().equals(RCHealth.Status.DOWN)) {
log.error("Broker of domain {} is down", domain);
return builder.down().withDetail(domain, status.getDetails());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ void setUp() {
@Test
void shouldBeUp() {
// Arrange
when(brokerProvider.healthCheck()).thenReturn(Mono.just(Health.up()
when(brokerProvider.healthCheck()).thenReturn(Mono.just(RCHealth.builder().up()
.withDetail(DOMAIN, DEFAULT_DOMAIN)
.withDetail(VERSION, "123")
.build()));
when(brokerProvider2.healthCheck()).thenReturn(Mono.just(Health.up()
when(brokerProvider2.healthCheck()).thenReturn(Mono.just(RCHealth.builder().up()
.withDetail(DOMAIN, OTHER)
.withDetail(VERSION, "1234")
.build()));
Expand All @@ -56,11 +56,11 @@ void shouldBeUp() {
@Test
void shouldBeDown() {
// Arrange
when(brokerProvider.healthCheck()).thenReturn(Mono.just(Health.up()
when(brokerProvider.healthCheck()).thenReturn(Mono.just(RCHealth.builder().down()
.withDetail(DOMAIN, DEFAULT_DOMAIN)
.withDetail(VERSION, "123")
.build()));
when(brokerProvider2.healthCheck()).thenReturn(Mono.just(Health.down()
when(brokerProvider2.healthCheck()).thenReturn(Mono.just(RCHealth.builder().up()
.withDetail(DOMAIN, OTHER)
.withDetail(VERSION, "1234")
.build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.reactivecommons.async.starter.broker.DiscardProvider;
import org.reactivecommons.async.starter.config.health.RCHealth;
import org.reactivecommons.async.starter.mybroker.props.MyBrokerAsyncProps;
import org.springframework.boot.actuate.health.Health;
import reactor.core.publisher.Mono;

@AllArgsConstructor
Expand Down Expand Up @@ -57,7 +57,7 @@ public void listenReplies(HandlerResolver resolver) {
}

@Override
public Mono<Health> healthCheck() {
public Mono<RCHealth> healthCheck() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.reactivecommons.async.kafka.listeners.ApplicationEventListener;
import org.reactivecommons.async.kafka.listeners.ApplicationNotificationsListener;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.springframework.boot.actuate.health.Health;
import org.reactivecommons.async.starter.config.health.RCHealth;
import org.springframework.boot.ssl.SslBundles;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -100,7 +100,7 @@ public void listenReplies(HandlerResolver resolver) {
}

@Override
public Mono<Health> healthCheck() {
public Mono<RCHealth> healthCheck() {
return healthIndicator.health();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.reactivecommons.async.starter.config.health.RCHealth;
import org.reactivecommons.async.starter.config.health.RCHealthIndicator;
import reactor.core.publisher.Mono;

import static org.reactivecommons.async.starter.config.health.ReactiveCommonsHealthIndicator.DOMAIN;
import static org.reactivecommons.async.starter.config.health.ReactiveCommonsHealthIndicator.VERSION;

@Log4j2
@AllArgsConstructor
public class KafkaReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
public class KafkaReactiveHealthIndicator extends RCHealthIndicator {
private final String domain;
private final AdminClient adminClient;

@Override
protected Mono<Health> doHealthCheck(Health.Builder builder) {
public Mono<RCHealth> doHealthCheck(RCHealth.RCHealthBuilder builder) {
builder.withDetail(DOMAIN, domain);
return checkKafkaHealth()
.map(clusterId -> builder.up().withDetail(VERSION, clusterId).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter;
import org.reactivecommons.async.kafka.health.KafkaReactiveHealthIndicator;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.springframework.boot.actuate.health.Health;
import org.reactivecommons.async.starter.config.health.RCHealth;
import org.springframework.boot.ssl.DefaultSslBundleRegistry;
import org.springframework.boot.ssl.SslBundles;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -136,12 +136,12 @@ void shouldListenNotificationEvents() {

@Test
void shouldProxyHealthCheck() {
when(healthIndicator.health()).thenReturn(Mono.fromSupplier(() -> Health.up().build()));
when(healthIndicator.health()).thenReturn(Mono.fromSupplier(() -> RCHealth.builder().up().build()));
// Act
Mono<Health> flow = brokerProvider.healthCheck();
Mono<RCHealth> flow = brokerProvider.healthCheck();
// Assert
StepVerifier.create(flow)
.expectNextMatches(health -> health.getStatus().getCode().equals("UP"))
.expectNextMatches(health -> health.getStatus().equals(RCHealth.Status.UP))
.verifyComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivecommons.async.starter.config.health.RCHealth;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Health.Builder;
import org.springframework.boot.actuate.health.Status;
Expand Down Expand Up @@ -39,13 +40,13 @@ void shouldBeUp() {
when(adminClient.describeCluster()).thenReturn(describeClusterResult);
when(describeClusterResult.clusterId()).thenReturn(KafkaFuture.completedFuture("cluster123"));
// Act
Mono<Health> result = indicator.doHealthCheck(new Builder());
Mono<RCHealth> result = indicator.doHealthCheck(RCHealth.builder());
// Assert
StepVerifier.create(result)
.assertNext(health -> {
assertEquals(DEFAULT_DOMAIN, health.getDetails().get("domain"));
assertEquals("cluster123", health.getDetails().get("version"));
assertEquals(Status.UP, health.getStatus());
assertEquals(RCHealth.Status.UP, health.getStatus());
})
.verifyComplete();
}
Expand All @@ -58,12 +59,12 @@ void shouldBeDown() {
future.completeExceptionally(new RuntimeException("simulate error"));
when(describeClusterResult.clusterId()).thenReturn(future);
// Act
Mono<Health> result = indicator.doHealthCheck(new Builder());
Mono<RCHealth> result = indicator.doHealthCheck(RCHealth.builder());
// Assert
StepVerifier.create(result)
.expectNextMatches(health -> {
assertEquals(DEFAULT_DOMAIN, health.getDetails().get("domain"));
assertEquals(Status.DOWN, health.getStatus());
assertEquals(RCHealth.Status.DOWN, health.getStatus());
return true;
})
.verifyComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.reactivecommons.async.rabbit.listeners.ApplicationQueryListener;
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.springframework.boot.actuate.health.Health;
import org.reactivecommons.async.starter.config.health.RCHealth;
import reactor.core.publisher.Mono;

import static reactor.rabbitmq.ExchangeSpecification.exchange;
Expand Down Expand Up @@ -154,7 +154,7 @@ public void listenReplies(HandlerResolver resolver) {
}

@Override
public Mono<Health> healthCheck() {
public Mono<RCHealth> healthCheck() {
return healthIndicator.health();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.reactivecommons.async.starter.config.health.RCHealth;
import org.reactivecommons.async.starter.config.health.RCHealthIndicator;
import reactor.core.publisher.Mono;

import java.net.SocketException;
Expand All @@ -14,7 +14,7 @@
import static org.reactivecommons.async.starter.config.health.ReactiveCommonsHealthIndicator.VERSION;

@Log4j2
public class RabbitReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
public class RabbitReactiveHealthIndicator extends RCHealthIndicator {
private final String domain;
private final ConnectionFactory connectionFactory;

Expand All @@ -25,7 +25,7 @@ public RabbitReactiveHealthIndicator(String domain, ConnectionFactory connection
}

@Override
protected Mono<Health> doHealthCheck(Health.Builder builder) {
public Mono<RCHealth> doHealthCheck(RCHealth.RCHealthBuilder builder) {
builder.withDetail(DOMAIN, domain);
return Mono.fromCallable(() -> getRawVersion(connectionFactory))
.map(status -> builder.up().withDetail(VERSION, status).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.reactivecommons.async.rabbit.converters.json.RabbitJacksonMessageConverter;
import org.reactivecommons.async.rabbit.health.RabbitReactiveHealthIndicator;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.springframework.boot.actuate.health.Health;
import org.reactivecommons.async.starter.config.health.RCHealth;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
Expand Down Expand Up @@ -181,12 +181,12 @@ void shouldListenQueries() {

@Test
void shouldProxyHealthCheck() {
when(healthIndicator.health()).thenReturn(Mono.fromSupplier(() -> Health.up().build()));
when(healthIndicator.health()).thenReturn(Mono.fromSupplier(() -> RCHealth.builder().up().build()));
// Act
Mono<Health> flow = brokerProvider.healthCheck();
Mono<RCHealth> flow = brokerProvider.healthCheck();
// Assert
StepVerifier.create(flow)
.expectNextMatches(health -> health.getStatus().getCode().equals("UP"))
.expectNextMatches(health -> health.getStatus().equals(RCHealth.Status.UP))
.verifyComplete();
}
}
Loading
Loading