11package io .javaoperatorsdk .operator ;
22
3+ import java .io .Closeable ;
34import java .io .IOException ;
45import java .net .ConnectException ;
56import java .util .LinkedList ;
@@ -25,8 +26,7 @@ public class Operator implements AutoCloseable {
2526 private static final Logger log = LoggerFactory .getLogger (Operator .class );
2627 private final KubernetesClient k8sClient ;
2728 private final ConfigurationService configurationService ;
28- private final List <ConfiguredController > controllers = new LinkedList <>();
29- private volatile boolean started = false ;
29+ private final ControllerManager controllers = new ControllerManager ();
3030
3131 public Operator (KubernetesClient k8sClient , ConfigurationService configurationService ) {
3232 this .k8sClient = k8sClient ;
@@ -62,14 +62,8 @@ public ConfigurationService getConfigurationService() {
6262 * where there is no obvious entrypoint to the application which can trigger the injection process
6363 * and start the cluster monitoring processes.
6464 */
65- @ SuppressWarnings ("unchecked" )
6665 public void start () {
67- if (started ) {
68- return ;
69- }
70- if (controllers .isEmpty ()) {
71- throw new OperatorException ("No ResourceController exists. Exiting!" );
72- }
66+ controllers .shouldStart ();
7367
7468 final var version = configurationService .getVersion ();
7569 log .info (
@@ -95,8 +89,7 @@ public void start() {
9589 throw new OperatorException (error , e );
9690 }
9791
98- controllers .parallelStream ().forEach (ConfiguredController ::start );
99- started = true ;
92+ controllers .start ();
10093 }
10194
10295 /** Stop the operator. */
@@ -105,20 +98,7 @@ public void close() {
10598 log .info (
10699 "Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
107100
108- if (!started ) {
109- return ;
110- }
111-
112- this .controllers .parallelStream ().forEach (closeable -> {
113- try {
114- log .debug ("closing {}" , closeable );
115- closeable .close ();
116- } catch (IOException e ) {
117- log .warn ("Error closing {}" , closeable , e );
118- }
119- });
120-
121- started = false ;
101+ controllers .close ();
122102 }
123103
124104 /**
@@ -164,10 +144,7 @@ public <R extends CustomResource> void register(
164144 }
165145 final var configuredController =
166146 new ConfiguredController (controller , configuration , k8sClient );
167- this .controllers .add (configuredController );
168- if (started ) {
169- configuredController .start ();
170- }
147+ controllers .add (configuredController );
171148
172149 final var watchedNS =
173150 configuration .watchAllNamespaces ()
@@ -180,4 +157,49 @@ public <R extends CustomResource> void register(
180157 watchedNS );
181158 }
182159 }
160+
161+ private static class ControllerManager implements Closeable {
162+ private final List <ConfiguredController > controllers = new LinkedList <>();
163+ private boolean started = false ;
164+
165+
166+ public synchronized void shouldStart () {
167+ if (started ) {
168+ return ;
169+ }
170+ if (controllers .isEmpty ()) {
171+ throw new OperatorException ("No ResourceController exists. Exiting!" );
172+ }
173+ }
174+
175+ public synchronized void start () {
176+ controllers .parallelStream ().forEach (ConfiguredController ::start );
177+ started = true ;
178+ }
179+
180+ @ Override
181+ public synchronized void close () {
182+ if (!started ) {
183+ return ;
184+ }
185+
186+ this .controllers .parallelStream ().forEach (closeable -> {
187+ try {
188+ log .debug ("closing {}" , closeable );
189+ closeable .close ();
190+ } catch (IOException e ) {
191+ log .warn ("Error closing {}" , closeable , e );
192+ }
193+ });
194+
195+ started = false ;
196+ }
197+
198+ public synchronized void add (ConfiguredController configuredController ) {
199+ this .controllers .add (configuredController );
200+ if (started ) {
201+ configuredController .start ();
202+ }
203+ }
204+ }
183205}
0 commit comments