@@ -79,25 +79,20 @@ void scheduleEvent(CustomResourceEvent event) {
7979 // Note that we always use finalizers, we want to process delete event just in corner case,
8080 // when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly).
8181 // We want to skip in case of finalizer was there since we don't want to execute delete method always at least 2x,
82- // which would be the result if we don't skip here. (If there is no deletion timestamp if resource deleted without finalizer.
82+ // which would be the result if we don't skip here. (there is no deletion timestamp if resource deleted without finalizer.)
8383 log .debug ("Skipping delete event since deletion timestamp is present on resource, so finalizer was in place." );
8484 return ;
8585 }
86- if (eventStore .receivedMoreRecentEventBefore (event )) {
87- log .debug ("Skipping event processing since was processed event with newer version before. {}" , event );
88- return ;
89- }
90- eventStore .updateLatestResourceVersionReceived (event );
91-
92- if (eventStore .containsOlderVersionOfNotScheduledEvent (event )) {
93- log .debug ("Replacing event which is not scheduled yet, since incoming event is more recent. new Event:{}" , event );
94- eventStore .addOrReplaceEventAsNotScheduledYet (event );
86+ if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
87+ log .debug ("Replacing not scheduled event with actual event." +
88+ " New event: {}" , event );
89+ eventStore .addOrReplaceEventAsNotScheduled (event );
9590 return ;
9691 }
97- if (eventStore .containsOlderVersionOfEventUnderProcessing (event )) {
92+ if (eventStore .containsEventUnderProcessing (event . resourceUid () )) {
9893 log .debug ("Scheduling event for later processing since there is an event under processing for same kind." +
9994 " New event: {}" , event );
100- eventStore .addOrReplaceEventAsNotScheduledYet (event );
95+ eventStore .addOrReplaceEventAsNotScheduled (event );
10196 return ;
10297 }
10398
@@ -107,9 +102,9 @@ void scheduleEvent(CustomResourceEvent event) {
107102 return ;
108103 }
109104 log .debug ("Creating scheduled task for event: {}" , event );
105+ eventStore .addEventUnderProcessing (event );
110106 executor .schedule (new EventConsumer (event , eventDispatcher , this ),
111107 nextBackOff .get (), TimeUnit .MILLISECONDS );
112- eventStore .addEventUnderProcessing (event );
113108 } finally {
114109 log .debug ("Scheduling event finished: {}" , event );
115110 lock .unlock ();
@@ -119,10 +114,11 @@ void scheduleEvent(CustomResourceEvent event) {
119114 void eventProcessingFinishedSuccessfully (CustomResourceEvent event ) {
120115 try {
121116 lock .lock ();
117+ log .debug ("Event processing successful for event: {}" , event );
122118 eventStore .removeEventUnderProcessing (event .resourceUid ());
123- CustomResourceEvent notScheduledYetEvent = eventStore .removeEventNotScheduledYet (event .resourceUid ());
124- if ( notScheduledYetEvent != null ) {
125- scheduleEvent (notScheduledYetEvent );
119+ if ( eventStore .containsNotScheduledEvent (event .resourceUid ())) {
120+ log . debug ( "Scheduling recent event for processing processing: {}" , event );
121+ scheduleEvent (eventStore . removeEventNotScheduled ( event . resourceUid ()) );
126122 }
127123 } finally {
128124 lock .unlock ();
@@ -133,15 +129,13 @@ void eventProcessingFailed(CustomResourceEvent event) {
133129 try {
134130 lock .lock ();
135131 eventStore .removeEventUnderProcessing (event .resourceUid ());
136- CustomResourceEvent notScheduledYetEvent = eventStore .removeEventNotScheduledYet (event .resourceUid ());
137- if (notScheduledYetEvent != null ) {
138- if (!notScheduledYetEvent .isSameResourceAndNewerVersion (event )) {
139- log .warn ("The not yet scheduled event has older version then actual event. This is probably a bug." );
140- }
141- // this is the case when we failed processing an event but we already received a new one.
142- // Since since we process declarative resources it correct to schedule the new event.
143- scheduleEvent (notScheduledYetEvent );
132+ if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
133+ CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (event .resourceUid ());
134+ log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}," +
135+ " Most recent event: {}" , event , notScheduledEvent );
136+ scheduleEvent (notScheduledEvent );
144137 } else {
138+ log .debug ("Event processing failed. Attempting to re-schedule the event: {}" , event );
145139 scheduleEvent (event );
146140 }
147141 } finally {
0 commit comments