88import io .fabric8 .kubernetes .client .CustomResource ;
99import io .fabric8 .kubernetes .client .Watcher ;
1010import org .assertj .core .api .Condition ;
11- import org .junit .jupiter .api .Disabled ;
1211import org .junit .jupiter .api .Test ;
1312import org .mockito .invocation .InvocationOnMock ;
1413
1514import java .time .LocalDateTime ;
1615import java .util .ArrayList ;
1716import java .util .Collections ;
1817import java .util .List ;
19- import java .util .concurrent .CompletableFuture ;
2018
21- import static com .github .containersolutions .operator .processing .retry .GenericRetry .* ;
19+ import static com .github .containersolutions .operator .processing .retry .GenericRetry .DEFAULT_INITIAL_INTERVAL ;
2220import static org .assertj .core .api .Assertions .assertThat ;
2321import static org .assertj .core .api .Assertions .atIndex ;
2422import static org .mockito .Mockito .*;
2523
2624class EventSchedulerTest {
2725
2826 public static final int INVOCATION_DURATION = 80 ;
27+ public static final int MAX_RETRY_ATTEMPTS = 3 ;
2928 @ SuppressWarnings ("unchecked" )
3029 private EventDispatcher eventDispatcher = mock (EventDispatcher .class );
3130
32- private EventScheduler eventScheduler = new EventScheduler (eventDispatcher , GenericRetry . defaultLimitedExponentialRetry ());
31+ private EventScheduler eventScheduler = new EventScheduler (eventDispatcher , new GenericRetry (). setMaxAttempts ( MAX_RETRY_ATTEMPTS ). withLinearRetry ());
3332
3433 private List <EventProcessingDetail > eventProcessingList = Collections .synchronizedList (new ArrayList <>());
3534
@@ -53,9 +52,8 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() throws Interrupted
5352 CustomResource resource2 = sampleResource ();
5453 resource2 .getMetadata ().setResourceVersion ("2" );
5554
56- CompletableFuture .runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 ));
57- Thread .sleep (50 );
58- CompletableFuture .runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 ));
55+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 );
56+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 );
5957
6058 waitTimeForExecution (2 );
6159 assertThat (eventProcessingList ).hasSize (2 )
@@ -85,44 +83,41 @@ public void retriesEventsWithErrors() {
8583 .has (new Condition <>(e -> e .getException () == null , "" ), atIndex (1 ));
8684 }
8785
88- @ Disabled ( "Todo change according to new scheduling" )
86+
8987 @ Test
90- public void schedulesEventIfOlderVersionIsAlreadyUnderProcessing () {
91- normalDispatcherExecution ();
88+ public void processesNewEventIfItIsReceivedAfterExecutionInError () {
9289 CustomResource resource1 = sampleResource ();
9390 CustomResource resource2 = sampleResource ();
9491 resource2 .getMetadata ().setResourceVersion ("2" );
9592
96- doAnswer (invocation -> {
97- Object [] args = invocation .getArguments ();
98- LocalDateTime start = LocalDateTime .now ();
99- CompletableFuture .runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 ));
100- Thread .sleep (INVOCATION_DURATION );
101- LocalDateTime end = LocalDateTime .now ();
102- eventProcessingList .add (new EventProcessingDetail ((Watcher .Action ) args [0 ], start , end , (CustomResource ) args [1 ]));
103- return null ;
104- }).doAnswer (this ::normalExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), any (CustomResource .class ));
93+ doAnswer (this ::exceptionInExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), eq (resource1 ));
94+ doAnswer (this ::normalExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), eq (resource2 ));
10595
106- CompletableFuture .runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 ));
96+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 );
97+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 );
10798
10899 waitTimeForExecution (2 );
100+
109101 assertThat (eventProcessingList ).hasSize (2 )
110102 .matches (list -> eventProcessingList .get (0 ).getCustomResource ().getMetadata ().getResourceVersion ().equals ("1" ) &&
111103 eventProcessingList .get (1 ).getCustomResource ().getMetadata ().getResourceVersion ().equals ("2" ),
112104 "Events processed in correct order" )
113105 .matches (list ->
114106 eventProcessingList .get (0 ).getEndTime ().isBefore (eventProcessingList .get (1 ).startTime ),
115107 "Start time of event 2 is after end time of event 1" );
108+
109+ assertThat (eventProcessingList .get (0 ).getException ()).isNotNull ();
110+ assertThat (eventProcessingList .get (1 ).getException ()).isNull ();
116111 }
117112
118113 @ Test
119114 public void numberOfRetriesIsLimited () {
120115 doAnswer (this ::exceptionInExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), any (CustomResource .class ));
121116
122- CompletableFuture . runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , sampleResource () ));
117+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , sampleResource ());
123118
124- waitTimeForExecution (1 , DEFAULT_MAX_ATTEMPTS + 2 );
125- assertThat (eventProcessingList ).hasSize (DEFAULT_MAX_ATTEMPTS );
119+ waitTimeForExecution (1 , MAX_RETRY_ATTEMPTS + 2 );
120+ assertThat (eventProcessingList ).hasSize (MAX_RETRY_ATTEMPTS );
126121 }
127122
128123 public void normalDispatcherExecution () {
@@ -167,8 +162,8 @@ private void waitTimeForExecution(int numberOfEvents) {
167162
168163 private void waitTimeForExecution (int numberOfEvents , int retries ) {
169164 try {
170- Thread .sleep (( long ) ( 200 + ((INVOCATION_DURATION + 30 ) * numberOfEvents ) + (retries * (INVOCATION_DURATION + 100 )) +
171- ( Math . pow ( DEFAULT_MULTIPLIER , retries ) * (DEFAULT_INITIAL_INTERVAL + 100 )) ));
165+ Thread .sleep (200 + ((INVOCATION_DURATION + 30 ) * numberOfEvents ) + (retries * (INVOCATION_DURATION + 100 )) +
166+ retries * (DEFAULT_INITIAL_INTERVAL + 100 ));
172167 } catch (InterruptedException e ) {
173168 throw new IllegalStateException (e );
174169 }
0 commit comments