@@ -61,14 +61,13 @@ public void eventReceived(Watcher.Action action, CustomResource resource) {
6161 log .debug ("Event received for action: {}, {}: {}" , action .toString ().toLowerCase (), resource .getClass ().getSimpleName (),
6262 resource .getMetadata ().getName ());
6363 CustomResourceEvent event = new CustomResourceEvent (action , resource , retry );
64- scheduleEvent (event );
64+ scheduleEventFromApi (event );
6565 }
6666
67- void scheduleEvent (CustomResourceEvent event ) {
68- log .trace ("Current queue size {}" , executor .getQueue ().size ());
69- log .debug ("Scheduling event: {}" , event );
67+ void scheduleEventFromApi (CustomResourceEvent event ) {
7068 try {
7169 lock .lock ();
70+ log .debug ("Scheduling event from Api: {}" , event );
7271 if (event .getResource ().getMetadata ().getDeletionTimestamp () != null && event .getAction () == Action .DELETED ) {
7372 // Note that we always use finalizers, we want to process delete event just in corner case,
7473 // when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly).
@@ -89,30 +88,39 @@ void scheduleEvent(CustomResourceEvent event) {
8988 eventStore .addOrReplaceEventAsNotScheduled (event );
9089 return ;
9190 }
91+ scheduleEventForExecution (event );
92+ log .trace ("Scheduling event from API finished: {}" , event );
93+ } finally {
94+ lock .unlock ();
95+ }
96+ }
9297
98+ private void scheduleEventForExecution (CustomResourceEvent event ) {
99+ try {
100+ lock .lock ();
101+ log .trace ("Current queue size {}" , executor .getQueue ().size ());
102+ log .debug ("Scheduling event for execution: {}" , event );
93103 Optional <Long > nextBackOff = event .nextBackOff ();
94104 if (!nextBackOff .isPresent ()) {
95105 log .warn ("Event max retry limit reached. Will be discarded. {}" , event );
96106 return ;
97107 }
98- log .debug ("Creating scheduled task for event: {}" , event );
99108 eventStore .addEventUnderProcessing (event );
100109 executor .schedule (new EventConsumer (event , eventDispatcher , this ),
101110 nextBackOff .get (), TimeUnit .MILLISECONDS );
111+ log .trace ("Scheduled task for event: {}" , event );
102112 } finally {
103- log .debug ("Scheduling event finished: {}" , event );
104113 lock .unlock ();
105114 }
106115 }
107116
108117 void eventProcessingFinishedSuccessfully (CustomResourceEvent event ) {
109118 try {
110119 lock .lock ();
111- log .debug ("Event processing successful for event: {}" , event );
112120 eventStore .removeEventUnderProcessing (event .resourceUid ());
113121 if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
114- log .debug ("Scheduling recent event for processing processing : {}" , event );
115- scheduleEvent ( eventStore . removeEventNotScheduled ( event .resourceUid () ));
122+ log .debug ("Scheduling recent event for processing: {}" , event );
123+ scheduleNotYetScheduledEventForExecution ( event .resourceUid ());
116124 }
117125 } finally {
118126 lock .unlock ();
@@ -124,19 +132,22 @@ void eventProcessingFailed(CustomResourceEvent event) {
124132 lock .lock ();
125133 eventStore .removeEventUnderProcessing (event .resourceUid ());
126134 if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
127- CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (event .resourceUid ());
128- log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}," +
129- " Most recent event: {}" , event , notScheduledEvent );
130- scheduleEvent (notScheduledEvent );
135+ log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}" , event );
136+ scheduleNotYetScheduledEventForExecution (event .resourceUid ());
131137 } else {
132138 log .debug ("Event processing failed. Attempting to re-schedule the event: {}" , event );
133- scheduleEvent (event );
139+ scheduleEventForExecution (event );
134140 }
135141 } finally {
136142 lock .unlock ();
137143 }
138144 }
139145
146+ private void scheduleNotYetScheduledEventForExecution (String uuid ) {
147+ CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (uuid );
148+ scheduleEventForExecution (notScheduledEvent );
149+ }
150+
140151 @ Override
141152 public void onClose (KubernetesClientException e ) {
142153 log .error ("Error: " , e );
0 commit comments