@@ -55,14 +55,13 @@ public void eventReceived(Watcher.Action action, CustomResource resource) {
5555 log .debug ("Event received for action: {}, {}: {}" , action .toString ().toLowerCase (), resource .getClass ().getSimpleName (),
5656 resource .getMetadata ().getName ());
5757 CustomResourceEvent event = new CustomResourceEvent (action , resource , retry );
58- scheduleEvent (event );
58+ scheduleEventFromApi (event );
5959 }
6060
61- void scheduleEvent (CustomResourceEvent event ) {
62- log .trace ("Current queue size {}" , executor .getQueue ().size ());
63- log .debug ("Scheduling event: {}" , event );
61+ void scheduleEventFromApi (CustomResourceEvent event ) {
6462 try {
6563 lock .lock ();
64+ log .debug ("Scheduling event from Api: {}" , event );
6665 if (event .getResource ().getMetadata ().getDeletionTimestamp () != null && event .getAction () == Action .DELETED ) {
6766 // Note that we always use finalizers, we want to process delete event just in corner case,
6867 // when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly).
@@ -83,30 +82,39 @@ void scheduleEvent(CustomResourceEvent event) {
8382 eventStore .addOrReplaceEventAsNotScheduled (event );
8483 return ;
8584 }
85+ scheduleEventForExecution (event );
86+ log .trace ("Scheduling event from API finished: {}" , event );
87+ } finally {
88+ lock .unlock ();
89+ }
90+ }
8691
92+ private void scheduleEventForExecution (CustomResourceEvent event ) {
93+ try {
94+ lock .lock ();
95+ log .trace ("Current queue size {}" , executor .getQueue ().size ());
96+ log .debug ("Scheduling event for execution: {}" , event );
8797 Optional <Long > nextBackOff = event .nextBackOff ();
8898 if (!nextBackOff .isPresent ()) {
8999 log .warn ("Event max retry limit reached. Will be discarded. {}" , event );
90100 return ;
91101 }
92- log .debug ("Creating scheduled task for event: {}" , event );
93102 eventStore .addEventUnderProcessing (event );
94103 executor .schedule (new EventConsumer (event , eventDispatcher , this ),
95104 nextBackOff .get (), TimeUnit .MILLISECONDS );
105+ log .trace ("Scheduled task for event: {}" , event );
96106 } finally {
97- log .debug ("Scheduling event finished: {}" , event );
98107 lock .unlock ();
99108 }
100109 }
101110
102111 void eventProcessingFinishedSuccessfully (CustomResourceEvent event ) {
103112 try {
104113 lock .lock ();
105- log .debug ("Event processing successful for event: {}" , event );
106114 eventStore .removeEventUnderProcessing (event .resourceUid ());
107115 if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
108- log .debug ("Scheduling recent event for processing processing : {}" , event );
109- scheduleEvent ( eventStore . removeEventNotScheduled ( event .resourceUid () ));
116+ log .debug ("Scheduling recent event for processing: {}" , event );
117+ scheduleNotYetScheduledEventForExecution ( event .resourceUid ());
110118 }
111119 } finally {
112120 lock .unlock ();
@@ -118,19 +126,22 @@ void eventProcessingFailed(CustomResourceEvent event) {
118126 lock .lock ();
119127 eventStore .removeEventUnderProcessing (event .resourceUid ());
120128 if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
121- CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (event .resourceUid ());
122- log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}," +
123- " Most recent event: {}" , event , notScheduledEvent );
124- scheduleEvent (notScheduledEvent );
129+ log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}" , event );
130+ scheduleNotYetScheduledEventForExecution (event .resourceUid ());
125131 } else {
126132 log .debug ("Event processing failed. Attempting to re-schedule the event: {}" , event );
127- scheduleEvent (event );
133+ scheduleEventForExecution (event );
128134 }
129135 } finally {
130136 lock .unlock ();
131137 }
132138 }
133139
140+ private void scheduleNotYetScheduledEventForExecution (String uuid ) {
141+ CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (uuid );
142+ scheduleEventForExecution (notScheduledEvent );
143+ }
144+
134145 @ Override
135146 public void onClose (KubernetesClientException e ) {
136147 log .error ("Error: " , e );
0 commit comments