Skip to content

Commit d92ad3b

Browse files
author
Jerome REVILLARD
committed
Merge branch 'UpdateTopicReplication' into 'master'
Update topic replication See merge request BigeYs/infrastructure/kafka-gitops!1
2 parents a7f3b13 + 634156c commit d92ad3b

File tree

67 files changed

+1039
-282
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+1039
-282
lines changed

.gitignore

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
.classpath
2+
.factorypath
3+
.project
4+
.settings/
5+
bin/
16
.idea/
27
*.iml
38
.gradle/
@@ -6,4 +11,6 @@ out/
611
docker/data/
712
state.yaml
813
plan.json
9-
test.py
14+
test.py
15+
/generated/
16+
/.apt_generated/

.gitlab-ci.yml

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
2+
stages:
3+
- package
4+
- deploy
5+
6+
include:
7+
- project: 'devops/containers/kaniko-docker-build'
8+
ref: v1.0.2
9+
file: '/.build_with_kaniko.yml'
10+
11+
######################## TEMPLATES ######################
12+
.build-docker-image:
13+
14+
extends: .build_with_kaniko
15+
except:
16+
refs:
17+
- tags
18+
- master
19+
20+
.push-docker-image:
21+
extends: .build_with_kaniko
22+
only:
23+
refs:
24+
- tags
25+
- master
26+
27+
######################## JOBS ########################
28+
build-docker:
29+
stage: package
30+
extends: .build-docker-image
31+
variables:
32+
KANIKO_EXECUTOR_EXTRA_OPTS: "--no-push"
33+
cache:
34+
key: $CI_PIPELINE_ID
35+
paths:
36+
- .m2/repository
37+
- target
38+
policy: pull
39+
40+
push-docker:
41+
stage: deploy
42+
extends: .push-docker-image
43+
variables:
44+
KANIKO_EXECUTOR_EXTRA_OPTS: ""
45+
cache:
46+
key: $CI_PIPELINE_ID
47+
paths:
48+
- .m2/repository
49+
- target
50+
policy: pull
51+

Dockerfile

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
FROM openjdk:8-jre-slim
1+
FROM gradle:4.7.0-jdk8-alpine AS build
2+
COPY --chown=gradle:gradle . /home/gradle/src
3+
WORKDIR /home/gradle/src
4+
RUN gradle clean build buildRelease -x test --no-daemon
25

6+
FROM openjdk:8-jre-slim
37
RUN apt-get update && apt-get --yes upgrade && \
48
apt-get install -y python3 python3-pip curl && \
59
rm -rf /var/lib/apt/lists/*
610

7-
COPY ./build/output/kafka-gitops /usr/local/bin/kafka-gitops
11+
COPY --from=build /home/gradle/src/build/output/kafka-gitops /usr/local/bin/kafka-gitops

Dockerfile.local

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
FROM openjdk:8-jre-slim
2+
3+
RUN apt-get update && apt-get --yes upgrade && \
4+
apt-get install -y python3 python3-pip curl && \
5+
rm -rf /var/lib/apt/lists/*
6+
7+
COPY ./build/output/kafka-gitops /usr/local/bin/kafka-gitops

docker/docker-compose.yml

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ services:
1515
- ./data/zoo1/datalog:/datalog
1616

1717
kafka1:
18-
image: confluentinc/cp-kafka:5.3.1
18+
image: confluentinc/cp-kafka:5.5.3
1919
hostname: kafka1
2020
ports:
2121
- "9092:9092"
@@ -38,4 +38,57 @@ services:
3838
- ./data/kafka1/data:/var/lib/kafka/data
3939
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
4040
depends_on:
41-
- zoo1
41+
- zoo1
42+
43+
kafka2:
44+
image: confluentinc/cp-kafka:5.5.3
45+
hostname: kafka2
46+
ports:
47+
- "9093:9093"
48+
environment:
49+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
50+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
51+
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
52+
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
53+
KAFKA_BROKER_ID: 2
54+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
55+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
56+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
57+
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
58+
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
59+
ZOOKEEPER_SASL_ENABLED: "false"
60+
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
61+
KAFKA_SUPER_USERS: "User:test;User:kafka"
62+
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
63+
volumes:
64+
- ./data/kafka2/data:/var/lib/kafka/data
65+
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
66+
depends_on:
67+
- zoo1
68+
69+
kafka3:
70+
image: confluentinc/cp-kafka:5.5.3
71+
hostname: kafka3
72+
ports:
73+
- "9094:9094"
74+
environment:
75+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
76+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
77+
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
78+
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
79+
KAFKA_BROKER_ID: 3
80+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
81+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
82+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
83+
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
84+
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
85+
ZOOKEEPER_SASL_ENABLED: "false"
86+
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
87+
KAFKA_SUPER_USERS: "User:test;User:kafka"
88+
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
89+
volumes:
90+
- ./data/kafka3/data:/var/lib/kafka/data
91+
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
92+
depends_on:
93+
- zoo1
94+

src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicConfigPlan.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public interface TopicConfigPlan {
1313
String getKey();
1414

1515
Optional<String> getValue();
16+
17+
Optional<String> getPreviousValue();
1618

1719
PlanAction getAction();
1820

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.devshawn.kafka.gitops.domain.plan;
2+
3+
import java.util.Optional;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
import com.devshawn.kafka.gitops.enums.PlanAction;
6+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = TopicDetailsPlan.Builder.class)
10+
public interface TopicDetailsPlan {
11+
Optional<Integer> getPartitions();
12+
Optional<Integer> getPreviousPartitions();
13+
PlanAction getPartitionsAction();
14+
15+
Optional<Integer> getReplication();
16+
Optional<Integer> getPreviousReplication();
17+
PlanAction getReplicationAction();
18+
19+
public static Optional<TopicDetailsPlan> toChangesOnlyPlan(Optional<TopicDetailsPlan> topicDetailsPlan) {
20+
if(! topicDetailsPlan.isPresent()) {
21+
return topicDetailsPlan;
22+
}
23+
TopicDetailsPlan.Builder builder = new TopicDetailsPlan.Builder();
24+
builder.setReplicationAction(topicDetailsPlan.get().getReplicationAction());
25+
builder.setPartitionsAction(topicDetailsPlan.get().getPartitionsAction());
26+
boolean nochanges = true;
27+
if ( topicDetailsPlan.get().getReplicationAction() != null && ! topicDetailsPlan.get().getReplicationAction().equals(PlanAction.NO_CHANGE)) {
28+
builder.setReplication(topicDetailsPlan.get().getReplication());
29+
builder.setPreviousReplication(topicDetailsPlan.get().getPreviousReplication());
30+
nochanges = false;
31+
}
32+
if (topicDetailsPlan.get().getPartitionsAction() != null && ! topicDetailsPlan.get().getPartitionsAction().equals(PlanAction.NO_CHANGE)) {
33+
builder.setPartitions(topicDetailsPlan.get().getPartitions());
34+
builder.setPreviousPartitions(topicDetailsPlan.get().getPreviousPartitions());
35+
nochanges = false;
36+
}
37+
if(nochanges) {
38+
return Optional.<TopicDetailsPlan>empty();
39+
}
40+
return Optional.of(builder.build());
41+
}
42+
class Builder extends TopicDetailsPlan_Builder {
43+
}
44+
}

src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicPlan.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.devshawn.kafka.gitops.domain.plan;
22

3-
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
43
import com.devshawn.kafka.gitops.enums.PlanAction;
54
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
65
import org.inferred.freebuilder.FreeBuilder;
@@ -16,12 +15,15 @@ public interface TopicPlan {
1615

1716
PlanAction getAction();
1817

19-
Optional<TopicDetails> getTopicDetails();
18+
Optional<TopicDetailsPlan> getTopicDetailsPlan();
2019

2120
List<TopicConfigPlan> getTopicConfigPlans();
2221

2322
default TopicPlan toChangesOnlyPlan() {
24-
TopicPlan.Builder builder = new TopicPlan.Builder().setName(getName()).setAction(getAction()).setTopicDetails(getTopicDetails());
23+
TopicPlan.Builder builder = new TopicPlan.Builder().setName(getName()).setAction(getAction());
24+
25+
builder.setTopicDetailsPlan(TopicDetailsPlan.toChangesOnlyPlan(getTopicDetailsPlan()));
26+
2527
getTopicConfigPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).forEach(builder::addTopicConfigPlans);
2628
return builder.build();
2729
}

src/main/java/com/devshawn/kafka/gitops/domain/state/AclDetails.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.devshawn.kafka.gitops.domain.state;
22

3-
import com.devshawn.kafka.gitops.exception.ValidationException;
43
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
54
import org.apache.kafka.common.acl.AccessControlEntry;
65
import org.apache.kafka.common.acl.AclBinding;
@@ -11,10 +10,6 @@
1110
import org.apache.kafka.common.resource.ResourceType;
1211
import org.inferred.freebuilder.FreeBuilder;
1312

14-
import java.util.Arrays;
15-
import java.util.List;
16-
import java.util.stream.Collectors;
17-
1813
@FreeBuilder
1914
@JsonDeserialize(builder = AclDetails.Builder.class)
2015
public abstract class AclDetails {

src/main/java/com/devshawn/kafka/gitops/manager/ApplyManager.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import com.devshawn.kafka.gitops.config.ManagerConfig;
44
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
55
import com.devshawn.kafka.gitops.domain.plan.TopicConfigPlan;
6+
import com.devshawn.kafka.gitops.domain.plan.TopicDetailsPlan;
67
import com.devshawn.kafka.gitops.domain.plan.TopicPlan;
78
import com.devshawn.kafka.gitops.enums.PlanAction;
89
import com.devshawn.kafka.gitops.service.KafkaService;
910
import com.devshawn.kafka.gitops.util.LogUtil;
1011
import org.apache.kafka.clients.admin.AlterConfigOp;
1112
import org.apache.kafka.clients.admin.ConfigEntry;
13+
import org.apache.kafka.common.Node;
1214
import org.apache.kafka.common.config.ConfigResource;
1315

1416
import java.util.*;
@@ -24,13 +26,25 @@ public ApplyManager(ManagerConfig managerConfig, KafkaService kafkaService) {
2426
}
2527

2628
public void applyTopics(DesiredPlan desiredPlan) {
29+
Collection<Node> clusterNodes = kafkaService.describeClusterNodes();
2730
desiredPlan.getTopicPlans().forEach(topicPlan -> {
2831
if (topicPlan.getAction() == PlanAction.ADD) {
2932
LogUtil.printTopicPreApply(topicPlan);
30-
kafkaService.createTopic(topicPlan.getName(), topicPlan.getTopicDetails().get());
33+
kafkaService.createTopic(topicPlan.getName(), topicPlan.getTopicDetailsPlan().get(), topicPlan.getTopicConfigPlans());
3134
LogUtil.printPostApply();
3235
} else if (topicPlan.getAction() == PlanAction.UPDATE) {
3336
LogUtil.printTopicPreApply(topicPlan);
37+
38+
if(topicPlan.getTopicDetailsPlan().isPresent()) {
39+
// Update Replication factor and partition number
40+
TopicDetailsPlan topicDetailsPlan = topicPlan.getTopicDetailsPlan().get();
41+
if(topicDetailsPlan.getPartitionsAction() == PlanAction.UPDATE) {
42+
kafkaService.addTopicPartition(topicPlan.getName(), topicDetailsPlan.getPartitions().get());
43+
}
44+
if(topicDetailsPlan.getReplicationAction() == PlanAction.UPDATE) {
45+
kafkaService.updateTopicReplication(clusterNodes, topicPlan.getName(), topicDetailsPlan.getReplication().get());
46+
}
47+
}
3448
topicPlan.getTopicConfigPlans().forEach(topicConfigPlan -> applyTopicConfiguration(topicPlan, topicConfigPlan));
3549
LogUtil.printPostApply();
3650
} else if (topicPlan.getAction() == PlanAction.REMOVE && !managerConfig.isDeleteDisabled()) {

0 commit comments

Comments
 (0)