22
33
44import com .github .containersolutions .operator .processing .retry .Retry ;
5- import com .google .common .util .concurrent .ThreadFactoryBuilder ;
65import io .fabric8 .kubernetes .client .CustomResource ;
76import io .fabric8 .kubernetes .client .KubernetesClientException ;
87import io .fabric8 .kubernetes .client .Watcher ;
1110
1211import java .util .Optional ;
1312import java .util .concurrent .ScheduledThreadPoolExecutor ;
14- import java .util .concurrent .ThreadFactory ;
1513import java .util .concurrent .TimeUnit ;
1614import java .util .concurrent .locks .ReentrantLock ;
1715
3230 * <li>Threading approach thus thread pool size and/or implementation should be configurable</li>
3331 * </ul>
3432 * <p>
35- * Notes:
36- * <ul>
37- * <li> In implementation we have to lock since the fabric8 client event handling is multi-threaded, we can receive multiple events
38- * for same resource. Also we do callback from other threads.
39- * </li>
40- * </ul>
4133 */
4234
4335public class EventScheduler implements Watcher <CustomResource > {
@@ -54,11 +46,7 @@ public class EventScheduler implements Watcher<CustomResource> {
5446 public EventScheduler (EventDispatcher eventDispatcher , Retry retry ) {
5547 this .eventDispatcher = eventDispatcher ;
5648 this .retry = retry ;
57- ThreadFactory threadFactory = new ThreadFactoryBuilder ()
58- .setNameFormat ("event-consumer-%d" )
59- .setDaemon (false )
60- .build ();
61- executor = new ScheduledThreadPoolExecutor (1 , threadFactory );
49+ executor = new ScheduledThreadPoolExecutor (1 );
6250 executor .setRemoveOnCancelPolicy (true );
6351 }
6452
@@ -67,14 +55,13 @@ public void eventReceived(Watcher.Action action, CustomResource resource) {
6755 log .debug ("Event received for action: {}, {}: {}" , action .toString ().toLowerCase (), resource .getClass ().getSimpleName (),
6856 resource .getMetadata ().getName ());
6957 CustomResourceEvent event = new CustomResourceEvent (action , resource , retry );
70- scheduleEvent (event );
58+ scheduleEventFromApi (event );
7159 }
7260
73- void scheduleEvent (CustomResourceEvent event ) {
74- log .trace ("Current queue size {}" , executor .getQueue ().size ());
75- log .debug ("Scheduling event: {}" , event );
61+ void scheduleEventFromApi (CustomResourceEvent event ) {
7662 try {
7763 lock .lock ();
64+ log .debug ("Scheduling event from Api: {}" , event );
7865 if (event .getResource ().getMetadata ().getDeletionTimestamp () != null && event .getAction () == Action .DELETED ) {
7966 // Note that we always use finalizers, we want to process delete event just in corner case,
8067 // when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly).
@@ -95,30 +82,39 @@ void scheduleEvent(CustomResourceEvent event) {
9582 eventStore .addOrReplaceEventAsNotScheduled (event );
9683 return ;
9784 }
85+ scheduleEventForExecution (event );
86+ log .trace ("Scheduling event from API finished: {}" , event );
87+ } finally {
88+ lock .unlock ();
89+ }
90+ }
9891
92+ private void scheduleEventForExecution (CustomResourceEvent event ) {
93+ try {
94+ lock .lock ();
95+ log .trace ("Current queue size {}" , executor .getQueue ().size ());
96+ log .debug ("Scheduling event for execution: {}" , event );
9997 Optional <Long > nextBackOff = event .nextBackOff ();
10098 if (!nextBackOff .isPresent ()) {
10199 log .warn ("Event max retry limit reached. Will be discarded. {}" , event );
102100 return ;
103101 }
104- log .debug ("Creating scheduled task for event: {}" , event );
105102 eventStore .addEventUnderProcessing (event );
106103 executor .schedule (new EventConsumer (event , eventDispatcher , this ),
107104 nextBackOff .get (), TimeUnit .MILLISECONDS );
105+ log .trace ("Scheduled task for event: {}" , event );
108106 } finally {
109- log .debug ("Scheduling event finished: {}" , event );
110107 lock .unlock ();
111108 }
112109 }
113110
114111 void eventProcessingFinishedSuccessfully (CustomResourceEvent event ) {
115112 try {
116113 lock .lock ();
117- log .debug ("Event processing successful for event: {}" , event );
118114 eventStore .removeEventUnderProcessing (event .resourceUid ());
119115 if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
120- log .debug ("Scheduling recent event for processing processing : {}" , event );
121- scheduleEvent ( eventStore . removeEventNotScheduled ( event .resourceUid () ));
116+ log .debug ("Scheduling recent event for processing: {}" , event );
117+ scheduleNotYetScheduledEventForExecution ( event .resourceUid ());
122118 }
123119 } finally {
124120 lock .unlock ();
@@ -130,19 +126,22 @@ void eventProcessingFailed(CustomResourceEvent event) {
130126 lock .lock ();
131127 eventStore .removeEventUnderProcessing (event .resourceUid ());
132128 if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
133- CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (event .resourceUid ());
134- log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}," +
135- " Most recent event: {}" , event , notScheduledEvent );
136- scheduleEvent (notScheduledEvent );
129+ log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}" , event );
130+ scheduleNotYetScheduledEventForExecution (event .resourceUid ());
137131 } else {
138132 log .debug ("Event processing failed. Attempting to re-schedule the event: {}" , event );
139- scheduleEvent (event );
133+ scheduleEventForExecution (event );
140134 }
141135 } finally {
142136 lock .unlock ();
143137 }
144138 }
145139
140+ private void scheduleNotYetScheduledEventForExecution (String uuid ) {
141+ CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (uuid );
142+ scheduleEventForExecution (notScheduledEvent );
143+ }
144+
146145 @ Override
147146 public void onClose (KubernetesClientException e ) {
148147 log .error ("Error: " , e );
0 commit comments