55import com .devshawn .kafka .gitops .domain .plan .DesiredPlan ;
66import com .devshawn .kafka .gitops .domain .plan .PlanOverview ;
77import com .devshawn .kafka .gitops .domain .plan .TopicConfigPlan ;
8+ import com .devshawn .kafka .gitops .domain .plan .TopicDetailsPlan ;
89import com .devshawn .kafka .gitops .domain .plan .TopicPlan ;
910import com .devshawn .kafka .gitops .domain .state .AclDetails ;
1011import com .devshawn .kafka .gitops .domain .state .DesiredState ;
1112import com .devshawn .kafka .gitops .domain .state .TopicDetails ;
1213import com .devshawn .kafka .gitops .enums .PlanAction ;
1314import com .devshawn .kafka .gitops .exception .PlanIsUpToDateException ;
1415import com .devshawn .kafka .gitops .exception .ReadPlanInputException ;
16+ import com .devshawn .kafka .gitops .exception .ValidationException ;
1517import com .devshawn .kafka .gitops .exception .WritePlanOutputException ;
1618import com .devshawn .kafka .gitops .service .KafkaService ;
1719import com .devshawn .kafka .gitops .util .PlanUtil ;
1820import com .fasterxml .jackson .databind .ObjectMapper ;
1921import org .apache .kafka .clients .admin .Config ;
2022import org .apache .kafka .clients .admin .ConfigEntry ;
21- import org .apache .kafka .clients .admin .TopicListing ;
23+ import org .apache .kafka .clients .admin .TopicDescription ;
2224import org .apache .kafka .common .acl .AclBinding ;
2325import org .apache .kafka .common .config .ConfigResource ;
2426import org .slf4j .LoggerFactory ;
@@ -47,37 +49,74 @@ public PlanManager(ManagerConfig managerConfig, KafkaService kafkaService, Objec
4749 }
4850
4951 public void planTopics (DesiredState desiredState , DesiredPlan .Builder desiredPlan ) {
50- List < TopicListing > topics = kafkaService .getTopics ();
51- List <String > topicNames = topics .stream ().map (TopicListing :: name ).collect (Collectors .toList ());
52+ Map < String , TopicDescription > topics = kafkaService .getTopics ();
53+ List <String > topicNames = topics .entrySet (). stream ().map (Map . Entry :: getKey ).collect (Collectors .toList ());
5254 Map <String , List <ConfigEntry >> topicConfigs = fetchTopicConfigurations (topicNames );
5355
5456 desiredState .getTopics ().forEach ((key , value ) -> {
55- TopicPlan .Builder topicPlan = new TopicPlan .Builder ()
56- . setName ( key )
57- . setTopicDetails ( value );
57+ TopicDetailsPlan .Builder topicDetailsPlan = new TopicDetailsPlan .Builder ();
58+ topicDetailsPlan . setPartitionsAction ( PlanAction . NO_CHANGE )
59+ . setReplicationAction ( PlanAction . NO_CHANGE );
5860
61+ TopicPlan .Builder topicPlan = new TopicPlan .Builder ()
62+ .setName (key );
63+ boolean topicDetailsAddOrUpdate = false ;
5964 if (!topicNames .contains (key )) {
6065 log .info ("[PLAN] Topic {} does not exist; it will be created." , key );
6166 topicPlan .setAction (PlanAction .ADD );
67+ topicDetailsPlan .setPartitionsAction (PlanAction .ADD )
68+ .setPartitions (value .getPartitions ())
69+ .setReplicationAction (PlanAction .ADD )
70+ .setReplication (value .getReplication ().get ());
71+ planTopicConfigurations (key , value , topicConfigs .get (key ), topicPlan );
72+ topicDetailsAddOrUpdate = true ;
6273 } else {
6374 log .info ("[PLAN] Topic {} exists, it will not be created." , key );
75+ TopicDescription topicDescription = topics .get (key );
76+
6477 topicPlan .setAction (PlanAction .NO_CHANGE );
78+ topicDetailsPlan .setPartitions (topicDescription .partitions ().size ())
79+ .setReplication (topicDescription .partitions ().get (0 ).replicas ().size ());
80+
81+ if (value .getPartitions ().intValue () != topicDescription .partitions ().size ()) {
82+ if ( value .getPartitions ().intValue () < topicDescription .partitions ().size ()) {
83+ throw new ValidationException ("Removing the partition number is not supported by Apache Kafka "
84+ + "(topic: " + key + " (" +topicDescription .partitions ().size ()+" -> " +value .getPartitions ().intValue ()+"))" );
85+ }
86+ topicDetailsPlan .setPartitions (value .getPartitions ())
87+ .setPreviousPartitions (topicDescription .partitions ().size ());
88+ topicDetailsPlan .setPartitionsAction (PlanAction .UPDATE );
89+ topicDetailsAddOrUpdate = true ;
90+ }
91+ if (value .getReplication ().isPresent () &&
92+ ( value .getReplication ().get ().intValue () != topicDescription .partitions ().get (0 ).replicas ().size ()) ) {
93+ topicDetailsPlan .setReplication (value .getReplication ().get ())
94+ .setPreviousReplication (topicDescription .partitions ().get (0 ).replicas ().size ());
95+ topicDetailsPlan .setReplicationAction (PlanAction .UPDATE );
96+ topicDetailsAddOrUpdate = true ;
97+ }
98+ if (topicDetailsAddOrUpdate ) {
99+ topicPlan .setAction (PlanAction .UPDATE );
100+ }
101+
65102 planTopicConfigurations (key , value , topicConfigs .get (key ), topicPlan );
66103 }
67104
105+ topicPlan .setTopicDetailsPlan (topicDetailsPlan .build ());
106+
68107 desiredPlan .addTopicPlans (topicPlan .build ());
69108 });
70109
71- topics .forEach (currentTopic -> {
72- boolean shouldIgnore = desiredState .getPrefixedTopicsToIgnore ().stream ().anyMatch (it -> currentTopic . name () .startsWith (it ));
110+ topics .forEach (( currentTopicName , currentTopicDescription ) -> {
111+ boolean shouldIgnore = desiredState .getPrefixedTopicsToIgnore ().stream ().anyMatch (it -> currentTopicName .startsWith (it ));
73112 if (shouldIgnore ) {
74- log .info ("[PLAN] Ignoring topic {} due to prefix" , currentTopic . name () );
113+ log .info ("[PLAN] Ignoring topic {} due to prefix" , currentTopicName );
75114 return ;
76115 }
77116
78- if (!managerConfig .isDeleteDisabled () && desiredState .getTopics ().getOrDefault (currentTopic . name () , null ) == null ) {
117+ if (!managerConfig .isDeleteDisabled () && desiredState .getTopics ().getOrDefault (currentTopicName , null ) == null ) {
79118 TopicPlan topicPlan = new TopicPlan .Builder ()
80- .setName (currentTopic . name () )
119+ .setName (currentTopicName )
81120 .setAction (PlanAction .REMOVE )
82121 .build ();
83122
@@ -88,7 +127,7 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
88127
89128 private void planTopicConfigurations (String topicName , TopicDetails topicDetails , List <ConfigEntry > configs , TopicPlan .Builder topicPlan ) {
90129 Map <String , TopicConfigPlan > configPlans = new HashMap <>();
91- List <ConfigEntry > customConfigs = configs .stream ()
130+ List <ConfigEntry > customConfigs = configs == null ? new ArrayList <>() : configs .stream ()
92131 .filter (it -> it .source () == ConfigEntry .ConfigSource .DYNAMIC_TOPIC_CONFIG )
93132 .collect (Collectors .toList ());
94133
@@ -104,8 +143,11 @@ private void planTopicConfigurations(String topicName, TopicDetails topicDetails
104143 configPlans .put (currentConfig .name (), topicConfigPlan .build ());
105144 } else if (newConfig == null ) {
106145 topicConfigPlan .setAction (PlanAction .REMOVE );
146+ topicConfigPlan .setPreviousValue (currentConfig .value ());
107147 configPlans .put (currentConfig .name (), topicConfigPlan .build ());
108- topicPlan .setAction (PlanAction .UPDATE );
148+ if (topicPlan .getAction () == null || topicPlan .getAction ().equals (PlanAction .NO_CHANGE )) {
149+ topicPlan .setAction (PlanAction .UPDATE );
150+ }
109151 }
110152 });
111153
@@ -119,11 +161,16 @@ private void planTopicConfigurations(String topicName, TopicDetails topicDetails
119161 if (currentConfig == null ) {
120162 topicConfigPlan .setAction (PlanAction .ADD );
121163 configPlans .put (key , topicConfigPlan .build ());
122- topicPlan .setAction (PlanAction .UPDATE );
164+ if (topicPlan .getAction () == null || topicPlan .getAction ().equals (PlanAction .NO_CHANGE )) {
165+ topicPlan .setAction (PlanAction .UPDATE );
166+ }
123167 } else if (!currentConfig .value ().equals (value )) {
124- topicConfigPlan .setAction (PlanAction .UPDATE );
168+ topicConfigPlan .setPreviousValue (currentConfig .value ())
169+ .setAction (PlanAction .UPDATE );
125170 configPlans .put (key , topicConfigPlan .build ());
126- topicPlan .setAction (PlanAction .UPDATE );
171+ if (topicPlan .getAction () == null || topicPlan .getAction ().equals (PlanAction .NO_CHANGE )) {
172+ topicPlan .setAction (PlanAction .UPDATE );
173+ }
127174 }
128175 });
129176
@@ -203,7 +250,7 @@ public void writePlanToFile(DesiredPlan desiredPlan) {
203250 writer .write (objectMapper .writeValueAsString (outputPlan ));
204251 writer .close ();
205252 } catch (IOException ex ) {
206- throw new WritePlanOutputException (ex .getMessage ());
253+ throw new WritePlanOutputException (ex .getMessage () + " ('" + managerConfig . getPlanFile (). get () + "')" );
207254 }
208255 }
209256 }
0 commit comments