@@ -32,7 +32,7 @@ class EventSchedulerTest {
3232 @ SuppressWarnings ("unchecked" )
3333 private EventDispatcher eventDispatcher = mock (EventDispatcher .class );
3434
35- private EventScheduler eventScheduler = new EventScheduler ( eventDispatcher , new GenericRetry (). setMaxAttempts ( MAX_RETRY_ATTEMPTS ). withLinearRetry () );
35+ private EventScheduler eventScheduler = initScheduler ( true );
3636
3737 private List <EventProcessingDetail > eventProcessingList = Collections .synchronizedList (new ArrayList <>());
3838
@@ -55,6 +55,7 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() throws Interrupted
5555 CustomResource resource1 = sampleResource ();
5656 CustomResource resource2 = sampleResource ();
5757 resource2 .getMetadata ().setResourceVersion ("2" );
58+ resource2 .getMetadata ().setGeneration (2l );
5859
5960 eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 );
6061 eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 );
@@ -70,14 +71,56 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() throws Interrupted
7071 "Start time of event 2 is after end time of event 1" );
7172 }
7273
74+ @ Test
75+ public void generationAwareSchedulingSkipsEventsWithoutIncreasedGeneration () {
76+ normalDispatcherExecution ();
77+ CustomResource resource1 = sampleResource ();
78+ CustomResource resource2 = sampleResource ();
79+ resource2 .getMetadata ().setResourceVersion ("2" );
80+
81+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 );
82+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 );
83+
84+ waitTimeForExecution (2 );
85+ assertThat (eventProcessingList ).hasSize (1 )
86+ .matches (list ->
87+ eventProcessingList .get (0 ).getCustomResource ().getMetadata ().getResourceVersion ().equals ("1" ));
88+
89+ }
90+
91+ @ Test
92+ public void notGenerationAwareSchedulingProcessesAllEventsRegardlessOfGeneration () {
93+ generationUnAwareScheduler ();
94+ normalDispatcherExecution ();
95+ CustomResource resource1 = sampleResource ();
96+ CustomResource resource2 = sampleResource ();
97+ resource2 .getMetadata ().setResourceVersion ("2" );
98+
99+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 );
100+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 );
101+
102+ waitTimeForExecution (2 );
103+ log .info ("Event processing details 1.: {}. 2: {}" , eventProcessingList .get (0 ), eventProcessingList .get (1 ));
104+ assertThat (eventProcessingList ).hasSize (2 )
105+ .matches (list -> eventProcessingList .get (0 ).getCustomResource ().getMetadata ().getResourceVersion ().equals ("1" ) &&
106+ eventProcessingList .get (1 ).getCustomResource ().getMetadata ().getResourceVersion ().equals ("2" ),
107+ "Events processed in correct order" )
108+ .matches (list ->
109+ eventProcessingList .get (0 ).getEndTime ().isBefore (eventProcessingList .get (1 ).startTime ),
110+ "Start time of event 2 is after end time of event 1" );
111+ }
112+
113+ // note that this is true for generation aware scheduling
73114 @ Test
74115 public void onlyLastEventIsScheduledIfMoreReceivedDuringAndExecution () {
75116 normalDispatcherExecution ();
76117 CustomResource resource1 = sampleResource ();
77118 CustomResource resource2 = sampleResource ();
78119 resource2 .getMetadata ().setResourceVersion ("2" );
120+ resource2 .getMetadata ().setGeneration (2l );
79121 CustomResource resource3 = sampleResource ();
80122 resource3 .getMetadata ().setResourceVersion ("3" );
123+ resource3 .getMetadata ().setGeneration (3l );
81124
82125 eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 );
83126 eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 );
@@ -118,6 +161,7 @@ public void processesNewEventIfItIsReceivedAfterExecutionInError() {
118161 CustomResource resource1 = sampleResource ();
119162 CustomResource resource2 = sampleResource ();
120163 resource2 .getMetadata ().setResourceVersion ("2" );
164+ resource2 .getMetadata ().setGeneration (2l );
121165
122166 doAnswer (this ::exceptionInExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), eq (resource1 ));
123167 doAnswer (this ::normalExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), eq (resource2 ));
@@ -140,7 +184,7 @@ public void processesNewEventIfItIsReceivedAfterExecutionInError() {
140184 }
141185
142186 @ Test
143- public void numberOfRetriesIsLimited () {
187+ public void numberOfRetriesCanBeLimited () {
144188 doAnswer (this ::exceptionInExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), any (CustomResource .class ));
145189
146190 eventScheduler .eventReceived (Watcher .Action .MODIFIED , sampleResource ());
@@ -166,6 +210,14 @@ private Object normalExecution(InvocationOnMock invocation) {
166210 }
167211 }
168212
213+ private void generationUnAwareScheduler () {
214+ eventScheduler = initScheduler (false );
215+ }
216+
217+ private EventScheduler initScheduler (boolean generationAware ) {
218+ return new EventScheduler (eventDispatcher ,
219+ new GenericRetry ().setMaxAttempts (MAX_RETRY_ATTEMPTS ).withLinearRetry (), generationAware );
220+ }
169221
170222 private Object exceptionInExecution (InvocationOnMock invocation ) {
171223 try {
@@ -203,7 +255,7 @@ CustomResource sampleResource() {
203255 resource .setMetadata (new ObjectMetaBuilder ()
204256 .withCreationTimestamp ("creationTimestamp" )
205257 .withDeletionGracePeriodSeconds (10L )
206- .withGeneration (10L )
258+ .withGeneration (1L )
207259 .withName ("name" )
208260 .withNamespace ("namespace" )
209261 .withResourceVersion ("1" )
0 commit comments