11package io .javaoperatorsdk .operator ;
22
3- import java .io .Closeable ;
43import java .io .IOException ;
5- import java .util .ArrayList ;
4+ import java .util .LinkedList ;
65import java .util .List ;
6+ import java .util .concurrent .locks .ReentrantLock ;
77
88import org .slf4j .Logger ;
99import org .slf4j .LoggerFactory ;
@@ -24,16 +24,13 @@ public class Operator implements AutoCloseable {
2424 private static final Logger log = LoggerFactory .getLogger (Operator .class );
2525 private final KubernetesClient k8sClient ;
2626 private final ConfigurationService configurationService ;
27- private final Object lock ;
28- private final List <ConfiguredController > controllers ;
29- private volatile boolean started ;
27+ private final ReentrantLock lock = new ReentrantLock () ;
28+ private final List <ConfiguredController > controllers = new LinkedList <>() ;
29+ private volatile boolean started = false ;
3030
3131 public Operator (KubernetesClient k8sClient , ConfigurationService configurationService ) {
3232 this .k8sClient = k8sClient ;
3333 this .configurationService = configurationService ;
34- this .lock = new Object ();
35- this .controllers = new ArrayList <>();
36- this .started = false ;
3734 DefaultEventHandler .setEventMonitor (new EventMonitor () {
3835 @ Override
3936 public void processedEvent (String uid , Event event ) {
@@ -67,10 +64,14 @@ public ConfigurationService getConfigurationService() {
6764 */
6865 @ SuppressWarnings ("unchecked" )
6966 public void start () {
70- synchronized (lock ) {
67+ try {
68+ lock .lock ();
7169 if (started ) {
7270 return ;
7371 }
72+ if (controllers .isEmpty ()) {
73+ throw new OperatorException ("No ResourceController exists. Exiting!" );
74+ }
7475
7576 final var version = configurationService .getVersion ();
7677 log .info (
@@ -79,10 +80,6 @@ public void start() {
7980 version .getCommit (),
8081 version .getBuiltTime ());
8182
82- if (controllers .isEmpty ()) {
83- throw new OperatorException ("No ResourceController exists. Exiting!" );
84- }
85-
8683 log .info ("Client version: {}" , Version .clientVersion ());
8784 try {
8885 final var k8sVersion = k8sClient .getVersion ();
@@ -94,33 +91,37 @@ public void start() {
9491 throw new OperatorException ("Error retrieving the server version" , e );
9592 }
9693
97- controllers .forEach (ConfiguredController ::start );
98-
94+ controllers .parallelStream ().forEach (ConfiguredController ::start );
9995 started = true ;
96+ } finally {
97+ lock .unlock ();
10098 }
10199 }
102100
103101 /** Stop the operator. */
104102 @ Override
105103 public void close () {
106- synchronized (lock ) {
104+ log .info (
105+ "Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
106+
107+ try {
108+ lock .lock ();
107109 if (!started ) {
108110 return ;
109111 }
110112
111- log .info (
112- "Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
113-
114- for (Closeable closeable : this .controllers ) {
113+ this .controllers .parallelStream ().forEach (closeable -> {
115114 try {
116115 log .debug ("closing {}" , closeable );
117116 closeable .close ();
118117 } catch (IOException e ) {
119118 log .warn ("Error closing {}" , closeable , e );
120119 }
121- }
120+ });
122121
123122 started = false ;
123+ } finally {
124+ lock .unlock ();
124125 }
125126 }
126127
0 commit comments