Skip to content

Commit 1883727

Browse files
committed
retry basic impl
1 parent 8aa933b commit 1883727

File tree

13 files changed

+135
-70
lines changed

13 files changed

+135
-70
lines changed

operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.javaoperatorsdk.operator.processing.CustomResourceCache;
1616
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
1717
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
18+
import io.javaoperatorsdk.operator.processing.retry.Retry;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021

@@ -38,16 +39,16 @@ public Operator(KubernetesClient k8sClient) {
3839

3940

4041
public <R extends CustomResource> void registerControllerForAllNamespaces(ResourceController<R> controller) throws OperatorException {
41-
registerController(controller, true);
42+
registerController(controller, true, null);
4243
}
4344

4445
public <R extends CustomResource> void registerController(ResourceController<R> controller, String... targetNamespaces) throws OperatorException {
45-
registerController(controller, false, targetNamespaces);
46+
registerController(controller, false, null, targetNamespaces);
4647
}
4748

4849
@SuppressWarnings("rawtypes")
4950
private <R extends CustomResource> void registerController(ResourceController<R> controller,
50-
boolean watchAllNamespaces, String... targetNamespaces) throws OperatorException {
51+
boolean watchAllNamespaces, Retry retry, String... targetNamespaces) throws OperatorException {
5152
Class<R> resClass = getCustomResourceClass(controller);
5253
CustomResourceDefinitionContext crd = getCustomResourceDefinitionForController(controller);
5354
KubernetesDeserializer.registerCustomKind(crd.getVersion(), crd.getKind(), resClass);
@@ -58,8 +59,8 @@ private <R extends CustomResource> void registerController(ResourceController<R>
5859

5960

6061
CustomResourceCache customResourceCache = new CustomResourceCache();
61-
DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcher, controller.getClass().getName());
62-
DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager(defaultEventHandler);
62+
DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcher, controller.getClass().getName(), retry);
63+
DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager(defaultEventHandler, retry != null);
6364
defaultEventHandler.setDefaultEventSourceManager(eventSourceManager);
6465
eventDispatcher.setEventSourceManager(eventSourceManager);
6566

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@
55
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
66
import io.javaoperatorsdk.operator.processing.event.Event;
77
import io.javaoperatorsdk.operator.processing.event.EventHandler;
8+
import io.javaoperatorsdk.operator.processing.retry.Retry;
9+
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
810
import org.slf4j.Logger;
911
import org.slf4j.LoggerFactory;
1012

11-
import java.util.HashSet;
12-
import java.util.Optional;
13-
import java.util.Set;
13+
import java.util.*;
1414
import java.util.concurrent.ScheduledThreadPoolExecutor;
1515
import java.util.concurrent.ThreadFactory;
1616
import java.util.concurrent.locks.ReentrantLock;
17-
import java.util.function.Predicate;
1817

1918
import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent;
2019
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
@@ -34,13 +33,17 @@ public class DefaultEventHandler implements EventHandler {
3433
private final Set<String> underProcessing = new HashSet<>();
3534
private final ScheduledThreadPoolExecutor executor;
3635
private final EventDispatcher eventDispatcher;
36+
private final Retry retry;
37+
private final Map<String, RetryExecution> retryState = new HashMap<>();
3738
private DefaultEventSourceManager defaultEventSourceManager;
3839

3940
private final ReentrantLock lock = new ReentrantLock();
4041

41-
public DefaultEventHandler(CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName) {
42+
public DefaultEventHandler(CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName,
43+
Retry retry) {
4244
this.customResourceCache = customResourceCache;
4345
this.eventDispatcher = eventDispatcher;
46+
this.retry = retry;
4447
eventBuffer = new EventBuffer();
4548
executor = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
4649
@Override
@@ -90,6 +93,13 @@ void eventProcessingFinished(ExecutionScope executionScope, PostExecutionControl
9093
lock.lock();
9194
log.debug("Event processing finished. Scope: {}", executionScope);
9295
unsetUnderExecution(executionScope.getCustomResourceUid());
96+
97+
if (retry != null && postExecutionControl.exceptionDuringExecution()) {
98+
handleRetryOnException(executionScope, postExecutionControl);
99+
} else if (retry != null) {
100+
handleSuccessfulExecutionRegardingRetry(executionScope);
101+
}
102+
93103
if (containsCustomResourceDeletedEvent(executionScope.getEvents())) {
94104
cleanupAfterDeletedEvent(executionScope.getCustomResourceUid());
95105
} else {
@@ -101,16 +111,49 @@ void eventProcessingFinished(ExecutionScope executionScope, PostExecutionControl
101111
}
102112
}
103113

114+
/**
115+
* Regarding the events there are 2 approaches we can take. Either retry always when there are new events (received meanwhile retry
116+
* is in place or already in buffer) instantly or always wait according to the retry timing if there was an exception.
117+
*/
118+
private void handleRetryOnException(ExecutionScope executionScope, PostExecutionControl postExecutionControl) {
119+
RetryExecution execution = getOrInitRetryExecution(executionScope);
120+
boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid());
121+
eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents());
122+
123+
Optional<Long> nextDelay = execution.nextDelay();
124+
if (newEventsExists) {
125+
executeBufferedEvents(executionScope.getCustomResourceUid());
126+
return;
127+
}
128+
nextDelay.ifPresent(delay ->
129+
defaultEventSourceManager.getRetryTimerEventSource()
130+
.scheduleOnce(executionScope.getCustomResource(), delay));
131+
}
132+
133+
private void handleSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) {
134+
retryState.remove(executionScope.getCustomResourceUid());
135+
defaultEventSourceManager.getRetryTimerEventSource().cancelOnceSchedule(executionScope.getCustomResourceUid());
136+
}
137+
138+
private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) {
139+
RetryExecution retryExecution = retryState.get(executionScope.getCustomResourceUid());
140+
if (retryExecution == null) {
141+
retryExecution = retry.initExecution();
142+
retryState.put(executionScope.getCustomResourceUid(), retryExecution);
143+
}
144+
return retryExecution;
145+
}
146+
104147
/**
105148
* Here we try to cache the latest resource after an update. The goal is to solve a concurrency issue we've seen:
106149
* If an execution is finished, where we updated a custom resource, but there are other events already buffered for next
107150
* execution, we might not get the newest custom resource from CustomResource event source in time. Thus we execute
108151
* the next batch of events but with a non up to date CR. Here we cache the latest CustomResource from the update
109152
* execution so we make sure its already used in the up-coming execution.
110-
*
153+
* <p>
111154
* Note that this is an improvement, not a bug fix. This situation can happen naturally, we just make the execution more
112155
* efficient, and avoid questions about conflicts.
113-
*
156+
* <p>
114157
* Note that without the conditional locking in the cache, there is a very minor chance that we would override an
115158
* additional change coming from a different client.
116159
*/

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ public void addEvent(Event event) {
1515
crEvents.add(event);
1616
}
1717

18+
public boolean newEventsExists(String resourceId) {
19+
return !events.get(resourceId).isEmpty();
20+
}
21+
22+
public void putBackEvents(String resourceUid, List<Event> oldEvents) {
23+
events.get(resourceUid).addAll(0, oldEvents);
24+
}
25+
1826
public boolean containsEvents(String customResourceId) {
1927
return events.get(customResourceId) != null;
2028
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,12 @@
55
import io.fabric8.kubernetes.client.dsl.MixedOperation;
66
import io.fabric8.kubernetes.client.dsl.Resource;
77
import io.javaoperatorsdk.operator.api.*;
8-
import io.javaoperatorsdk.operator.processing.event.Event;
98
import io.javaoperatorsdk.operator.processing.event.EventList;
109
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
11-
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
1210
import org.slf4j.Logger;
1311
import org.slf4j.LoggerFactory;
1412

1513
import java.util.ArrayList;
16-
import java.util.Map;
17-
import java.util.concurrent.ConcurrentHashMap;
1814

1915
import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent;
2016
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.*;
@@ -43,16 +39,16 @@ public void setEventSourceManager(EventSourceManager eventSourceManager) {
4339
this.eventSourceManager = eventSourceManager;
4440
}
4541

46-
public PostExecutionControl handleEvent(ExecutionScope event) {
42+
public PostExecutionControl handleExecution(ExecutionScope executionScope) {
4743
try {
48-
return handDispatch(event);
44+
return handleDispatch(executionScope);
4945
} catch (RuntimeException e) {
50-
log.error("Error during event processing {} failed.", event, e);
51-
return PostExecutionControl.defaultDispatch();
46+
log.error("Error during event processing {} failed.", executionScope, e);
47+
return PostExecutionControl.exceptionDuringExecution(e);
5248
}
5349
}
5450

55-
private PostExecutionControl handDispatch(ExecutionScope executionScope) {
51+
private PostExecutionControl handleDispatch(ExecutionScope executionScope) {
5652
CustomResource resource = executionScope.getCustomResource();
5753
log.debug("Handling events: {} for resource {}", executionScope.getEvents(), resource.getMetadata());
5854

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class ExecutionConsumer implements Runnable {
1919

2020
@Override
2121
public void run() {
22-
PostExecutionControl postExecutionControl = eventDispatcher.handleEvent(executionScope);
22+
PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope);
2323
defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl);
2424
}
2525

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.javaoperatorsdk.operator.processing;
22

33
import io.fabric8.kubernetes.client.CustomResource;
4-
import io.javaoperatorsdk.operator.api.UpdateControl;
54

65
import java.util.Optional;
76

@@ -11,21 +10,28 @@ public final class PostExecutionControl {
1110

1211
private final CustomResource updatedCustomResource;
1312

14-
private PostExecutionControl(boolean onlyFinalizerHandled, CustomResource updatedCustomResource) {
13+
private final RuntimeException runtimeException;
14+
15+
private PostExecutionControl(boolean onlyFinalizerHandled, CustomResource updatedCustomResource, RuntimeException runtimeException) {
1516
this.onlyFinalizerHandled = onlyFinalizerHandled;
1617
this.updatedCustomResource = updatedCustomResource;
18+
this.runtimeException = runtimeException;
1719
}
1820

1921
public static PostExecutionControl onlyFinalizerAdded() {
20-
return new PostExecutionControl(true, null);
22+
return new PostExecutionControl(true, null, null);
2123
}
2224

2325
public static PostExecutionControl defaultDispatch() {
24-
return new PostExecutionControl(false, null);
26+
return new PostExecutionControl(false, null, null);
2527
}
2628

2729
public static PostExecutionControl customResourceUpdated(CustomResource updatedCustomResource) {
28-
return new PostExecutionControl(false, updatedCustomResource);
30+
return new PostExecutionControl(false, updatedCustomResource, null);
31+
}
32+
33+
public static PostExecutionControl exceptionDuringExecution(RuntimeException exception) {
34+
return new PostExecutionControl(false, null, exception);
2935
}
3036

3137
public boolean isOnlyFinalizerHandled() {
@@ -39,4 +45,12 @@ public Optional<CustomResource> getUpdatedCustomResource() {
3945
public boolean customResourceUpdatedDuringExecution() {
4046
return updatedCustomResource != null;
4147
}
48+
49+
public boolean exceptionDuringExecution() {
50+
return runtimeException != null;
51+
}
52+
53+
public Optional<RuntimeException> getRuntimeException() {
54+
return Optional.ofNullable(runtimeException);
55+
}
4256
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
44
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
5+
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
56
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
78

@@ -12,15 +13,22 @@
1213

1314
public class DefaultEventSourceManager implements EventSourceManager {
1415

16+
public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source";
1517
private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class);
1618

19+
1720
private final ReentrantLock lock = new ReentrantLock();
1821
private Map<String, EventSource> eventSources = new ConcurrentHashMap<>();
1922
private CustomResourceEventSource customResourceEventSource;
2023
private DefaultEventHandler defaultEventHandler;
24+
private TimerEventSource retryTimerEventSource;
2125

22-
public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler) {
26+
public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) {
2327
this.defaultEventHandler = defaultEventHandler;
28+
if (supportRetry) {
29+
this.retryTimerEventSource = new TimerEventSource();
30+
registerEventSource(RETRY_TIMER_EVENT_SOURCE_NAME, retryTimerEventSource);
31+
}
2432
}
2533

2634
public void registerCustomResourceEventSource(CustomResourceEventSource customResourceEventSource) {
@@ -36,15 +44,15 @@ public <T extends EventSource> void registerEventSource(String name, T eventSour
3644
if (currentEventSource != null) {
3745
throw new IllegalStateException("Event source with name already registered. Event source name: " + name);
3846
}
39-
eventSources.put(name,eventSource);
47+
eventSources.put(name, eventSource);
4048
eventSource.setEventHandler(defaultEventHandler);
4149
} finally {
4250
lock.unlock();
4351
}
4452
}
4553

4654
@Override
47-
public Optional<EventSource> deRegisterCustomResourceFromEventSource(String eventSourceName,String customResourceUid) {
55+
public Optional<EventSource> deRegisterCustomResourceFromEventSource(String eventSourceName, String customResourceUid) {
4856
try {
4957
lock.lock();
5058
EventSource eventSource = this.eventSources.get(eventSourceName);
@@ -60,13 +68,17 @@ public Optional<EventSource> deRegisterCustomResourceFromEventSource(String even
6068
}
6169
}
6270

71+
public TimerEventSource getRetryTimerEventSource() {
72+
return retryTimerEventSource;
73+
}
74+
6375
@Override
6476
public Map<String, EventSource> getRegisteredEventSources() {
6577
return Collections.unmodifiableMap(eventSources);
6678
}
6779

6880
public void cleanup(String customResourceUid) {
69-
getRegisteredEventSources().keySet().forEach(k -> deRegisterCustomResourceFromEventSource(k,customResourceUid));
81+
getRegisteredEventSources().keySet().forEach(k -> deRegisterCustomResourceFromEventSource(k, customResourceUid));
7082
eventSources.remove(customResourceUid);
7183
}
7284

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public static GenericRetry defaultLimitedExponentialRetry() {
1616
}
1717

1818
public static GenericRetry noRetry() {
19-
return new GenericRetry().setMaxAttempts(1);
19+
return new GenericRetry().setMaxAttempts(0);
2020
}
2121

2222
public static GenericRetry every10second10TimesRetry() {

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,7 @@ public GenericRetryExecution(GenericRetry genericRetry) {
1515
this.currentInterval = genericRetry.getInitialInterval();
1616
}
1717

18-
/**
19-
* Note that first attempt is always 0. Since this implementation is tailored for event scheduling.
20-
*/
2118
public Optional<Long> nextDelay() {
22-
if (lastAttemptIndex == 0) {
23-
lastAttemptIndex++;
24-
return Optional.of(0L);
25-
}
2619
if (genericRetry.getMaxAttempts() > -1 && lastAttemptIndex >= genericRetry.getMaxAttempts()) {
2720
return Optional.empty();
2821
}

0 commit comments

Comments
 (0)