88import io .javaoperatorsdk .operator .processing .event .DefaultEventSourceManager ;
99import io .javaoperatorsdk .operator .processing .event .Event ;
1010import io .javaoperatorsdk .operator .processing .event .EventHandler ;
11- import java .util .HashSet ;
12- import java .util .Optional ;
13- import java .util .Set ;
1411import io .javaoperatorsdk .operator .processing .retry .Retry ;
1512import io .javaoperatorsdk .operator .processing .retry .RetryExecution ;
16- import org .slf4j .Logger ;
17- import org .slf4j .LoggerFactory ;
18-
1913import java .util .*;
14+ import java .util .HashSet ;
15+ import java .util .Optional ;
16+ import java .util .Set ;
2017import java .util .concurrent .ScheduledThreadPoolExecutor ;
2118import java .util .concurrent .ThreadFactory ;
2219import java .util .concurrent .locks .ReentrantLock ;
2320import org .slf4j .Logger ;
2421import org .slf4j .LoggerFactory ;
2522
26- import static io .javaoperatorsdk .operator .EventListUtils .containsCustomResourceDeletedEvent ;
27- import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getUID ;
28- import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getVersion ;
29-
3023/**
3124 * Event handler that makes sure that events are processed in a "single threaded" way per resource
3225 * UID, while buffering events which are received during an execution.
@@ -35,26 +28,32 @@ public class DefaultEventHandler implements EventHandler {
3528
3629 private static final Logger log = LoggerFactory .getLogger (DefaultEventHandler .class );
3730
38- private final CustomResourceCache customResourceCache ;
39- private final EventBuffer eventBuffer ;
40- private final Set <String > underProcessing = new HashSet <>();
41- private final ScheduledThreadPoolExecutor executor ;
42- private final EventDispatcher eventDispatcher ;
43- private final Retry retry ;
44- private final Map <String , RetryExecution > retryState = new HashMap <>();
45- private DefaultEventSourceManager defaultEventSourceManager ;
31+ private final CustomResourceCache customResourceCache ;
32+ private final EventBuffer eventBuffer ;
33+ private final Set <String > underProcessing = new HashSet <>();
34+ private final ScheduledThreadPoolExecutor executor ;
35+ private final EventDispatcher eventDispatcher ;
36+ private final Retry retry ;
37+ private final Map <String , RetryExecution > retryState = new HashMap <>();
38+ private DefaultEventSourceManager defaultEventSourceManager ;
4639
4740 private final ReentrantLock lock = new ReentrantLock ();
4841
49- public DefaultEventHandler (CustomResourceCache customResourceCache , EventDispatcher eventDispatcher , String relatedControllerName ,
50- Retry retry ) {
51- this .customResourceCache = customResourceCache ;
52- this .eventDispatcher = eventDispatcher ;
53- this .retry = retry ;
54- eventBuffer = new EventBuffer ();
55- executor = new ScheduledThreadPoolExecutor (5 , new ThreadFactory () {
56- @ Override
57- public Thread newThread (Runnable runnable ) {
42+ public DefaultEventHandler (
43+ CustomResourceCache customResourceCache ,
44+ EventDispatcher eventDispatcher ,
45+ String relatedControllerName ,
46+ Retry retry ) {
47+ this .customResourceCache = customResourceCache ;
48+ this .eventDispatcher = eventDispatcher ;
49+ this .retry = retry ;
50+ eventBuffer = new EventBuffer ();
51+ executor =
52+ new ScheduledThreadPoolExecutor (
53+ 5 ,
54+ new ThreadFactory () {
55+ @ Override
56+ public Thread newThread (Runnable runnable ) {
5857 return new Thread (runnable , "EventHandler-" + relatedControllerName );
5958 }
6059 });
@@ -101,80 +100,90 @@ private void executeBufferedEvents(String customResourceUid) {
101100 }
102101 }
103102
104- void eventProcessingFinished (ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
105- try {
106- lock . lock ();
107- log . debug ( "Event processing finished. Scope: {}" , executionScope );
108- unsetUnderExecution ( executionScope . getCustomResourceUid () );
109-
110- if ( retry != null && postExecutionControl . exceptionDuringExecution ()) {
111- handleRetryOnException ( executionScope , postExecutionControl );
112- } else if ( retry != null ) {
113- handleSuccessfulExecutionRegardingRetry ( executionScope );
114- }
115-
116- if ( containsCustomResourceDeletedEvent ( executionScope . getEvents ())) {
117- cleanupAfterDeletedEvent ( executionScope .getCustomResourceUid ());
118- } else {
119- cacheUpdatedResourceIfChanged ( executionScope , postExecutionControl );
120- executeBufferedEvents (executionScope . getCustomResourceUid () );
121- }
122- } finally {
123- lock . unlock ();
124- }
103+ void eventProcessingFinished (
104+ ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
105+ try {
106+ lock . lock ( );
107+ log . debug ( "Event processing finished. Scope: {}" , executionScope );
108+ unsetUnderExecution ( executionScope . getCustomResourceUid ());
109+
110+ if ( retry != null && postExecutionControl . exceptionDuringExecution ()) {
111+ handleRetryOnException ( executionScope , postExecutionControl );
112+ } else if ( retry != null ) {
113+ handleSuccessfulExecutionRegardingRetry ( executionScope );
114+ }
115+
116+ if ( containsCustomResourceDeletedEvent ( executionScope .getEvents ())) {
117+ cleanupAfterDeletedEvent ( executionScope . getCustomResourceUid ());
118+ } else {
119+ cacheUpdatedResourceIfChanged (executionScope , postExecutionControl );
120+ executeBufferedEvents ( executionScope . getCustomResourceUid ());
121+ }
122+ } finally {
123+ lock . unlock ();
125124 }
125+ }
126126
127- /**
128- * Regarding the events there are 2 approaches we can take. Either retry always when there are new events (received meanwhile retry
129- * is in place or already in buffer) instantly or always wait according to the retry timing if there was an exception.
130- */
131- private void handleRetryOnException (ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
132- RetryExecution execution = getOrInitRetryExecution (executionScope );
133- boolean newEventsExists = eventBuffer .newEventsExists (executionScope .getCustomResourceUid ());
134- eventBuffer .putBackEvents (executionScope .getCustomResourceUid (), executionScope .getEvents ());
135-
136- Optional <Long > nextDelay = execution .nextDelay ();
137- if (newEventsExists ) {
138- executeBufferedEvents (executionScope .getCustomResourceUid ());
139- return ;
140- }
141- nextDelay .ifPresent (delay ->
142- defaultEventSourceManager .getRetryTimerEventSource ()
143- .scheduleOnce (executionScope .getCustomResource (), delay ));
127+ /**
128+ * Regarding the events there are 2 approaches we can take. Either retry always when there are new
129+ * events (received meanwhile retry is in place or already in buffer) instantly or always wait
130+ * according to the retry timing if there was an exception.
131+ */
132+ private void handleRetryOnException (
133+ ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
134+ RetryExecution execution = getOrInitRetryExecution (executionScope );
135+ boolean newEventsExists = eventBuffer .newEventsExists (executionScope .getCustomResourceUid ());
136+ eventBuffer .putBackEvents (executionScope .getCustomResourceUid (), executionScope .getEvents ());
137+
138+ Optional <Long > nextDelay = execution .nextDelay ();
139+ if (newEventsExists ) {
140+ executeBufferedEvents (executionScope .getCustomResourceUid ());
141+ return ;
144142 }
143+ nextDelay .ifPresent (
144+ delay ->
145+ defaultEventSourceManager
146+ .getRetryTimerEventSource ()
147+ .scheduleOnce (executionScope .getCustomResource (), delay ));
148+ }
145149
146- private void handleSuccessfulExecutionRegardingRetry (ExecutionScope executionScope ) {
147- retryState .remove (executionScope .getCustomResourceUid ());
148- defaultEventSourceManager .getRetryTimerEventSource ().cancelOnceSchedule (executionScope .getCustomResourceUid ());
149- }
150+ private void handleSuccessfulExecutionRegardingRetry (ExecutionScope executionScope ) {
151+ retryState .remove (executionScope .getCustomResourceUid ());
152+ defaultEventSourceManager
153+ .getRetryTimerEventSource ()
154+ .cancelOnceSchedule (executionScope .getCustomResourceUid ());
155+ }
150156
151- private RetryExecution getOrInitRetryExecution (ExecutionScope executionScope ) {
152- RetryExecution retryExecution = retryState .get (executionScope .getCustomResourceUid ());
153- if (retryExecution == null ) {
154- retryExecution = retry .initExecution ();
155- retryState .put (executionScope .getCustomResourceUid (), retryExecution );
156- }
157- return retryExecution ;
157+ private RetryExecution getOrInitRetryExecution (ExecutionScope executionScope ) {
158+ RetryExecution retryExecution = retryState .get (executionScope .getCustomResourceUid ());
159+ if (retryExecution == null ) {
160+ retryExecution = retry .initExecution ();
161+ retryState .put (executionScope .getCustomResourceUid (), retryExecution );
158162 }
163+ return retryExecution ;
164+ }
159165
160- /**
161- * Here we try to cache the latest resource after an update. The goal is to solve a concurrency issue we've seen:
162- * If an execution is finished, where we updated a custom resource, but there are other events already buffered for next
163- * execution, we might not get the newest custom resource from CustomResource event source in time. Thus we execute
164- * the next batch of events but with a non up to date CR. Here we cache the latest CustomResource from the update
165- * execution so we make sure its already used in the up-coming execution.
166- * <p>
167- * Note that this is an improvement, not a bug fix. This situation can happen naturally, we just make the execution more
168- * efficient, and avoid questions about conflicts.
169- * <p>
170- * Note that without the conditional locking in the cache, there is a very minor chance that we would override an
171- * additional change coming from a different client.
172- */
173- private void cacheUpdatedResourceIfChanged (ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
174- if (postExecutionControl .customResourceUpdatedDuringExecution ()) {
175- CustomResource originalCustomResource = executionScope .getCustomResource ();
176- CustomResource customResourceAfterExecution = postExecutionControl .getUpdatedCustomResource ().get ();
177- String originalResourceVersion = getVersion (originalCustomResource );
166+ /**
167+ * Here we try to cache the latest resource after an update. The goal is to solve a concurrency
168+ * issue we've seen: If an execution is finished, where we updated a custom resource, but there
169+ * are other events already buffered for next execution, we might not get the newest custom
170+ * resource from CustomResource event source in time. Thus we execute the next batch of events but
171+ * with a non up to date CR. Here we cache the latest CustomResource from the update execution so
172+ * we make sure its already used in the up-coming execution.
173+ *
174+ * <p>Note that this is an improvement, not a bug fix. This situation can happen naturally, we
175+ * just make the execution more efficient, and avoid questions about conflicts.
176+ *
177+ * <p>Note that without the conditional locking in the cache, there is a very minor chance that we
178+ * would override an additional change coming from a different client.
179+ */
180+ private void cacheUpdatedResourceIfChanged (
181+ ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
182+ if (postExecutionControl .customResourceUpdatedDuringExecution ()) {
183+ CustomResource originalCustomResource = executionScope .getCustomResource ();
184+ CustomResource customResourceAfterExecution =
185+ postExecutionControl .getUpdatedCustomResource ().get ();
186+ String originalResourceVersion = getVersion (originalCustomResource );
178187
179188 log .debug (
180189 "Trying to update resource cache from update response for resource uid: {} new version: {} old version: {}" ,
0 commit comments