1- package io .javaoperatorsdk .operator .processing ;
1+ package io .javaoperatorsdk .operator .processing . event ;
22
33import java .util .HashMap ;
44import java .util .HashSet ;
1414
1515import io .fabric8 .kubernetes .api .model .HasMetadata ;
1616import io .javaoperatorsdk .operator .OperatorException ;
17- import io .javaoperatorsdk .operator .api .LifecycleAware ;
1817import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
1918import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
2019import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
2120import io .javaoperatorsdk .operator .api .reconciler .RetryInfo ;
22- import io .javaoperatorsdk .operator .processing .event . Event ;
23- import io .javaoperatorsdk .operator .processing .event . EventHandler ;
24- import io .javaoperatorsdk .operator .processing .event . EventSourceManager ;
25- import io .javaoperatorsdk .operator .processing .event .ResourceID ;
26- import io .javaoperatorsdk .operator .processing .event .internal . ResourceAction ;
27- import io .javaoperatorsdk .operator .processing .event .internal . ResourceEvent ;
21+ import io .javaoperatorsdk .operator .processing .LifecycleAware ;
22+ import io .javaoperatorsdk .operator .processing .MDCUtils ;
23+ import io .javaoperatorsdk .operator .processing .ResourceCache ;
24+ import io .javaoperatorsdk .operator .processing .event .source . ResourceAction ;
25+ import io .javaoperatorsdk .operator .processing .event .source . ResourceEvent ;
26+ import io .javaoperatorsdk .operator .processing .event .source . TimerEventSource ;
2827import io .javaoperatorsdk .operator .processing .retry .GenericRetry ;
2928import io .javaoperatorsdk .operator .processing .retry .Retry ;
3029import io .javaoperatorsdk .operator .processing .retry .RetryExecution ;
3635 * Event handler that makes sure that events are processed in a "single threaded" way per resource
3736 * UID, while buffering events which are received during an execution.
3837 */
39- public class EventProcessor <R extends HasMetadata >
40- implements EventHandler , LifecycleAware {
38+ class EventProcessor <R extends HasMetadata > implements EventHandler , LifecycleAware {
4139
4240 private static final Logger log = LoggerFactory .getLogger (EventProcessor .class );
4341
@@ -51,32 +49,34 @@ public class EventProcessor<R extends HasMetadata>
5149 private final Metrics metrics ;
5250 private volatile boolean running ;
5351 private final ResourceCache <R > resourceCache ;
54- private EventSourceManager <R > eventSourceManager ;
52+ private final EventSourceManager <R > eventSourceManager ;
5553 private final EventMarker eventMarker ;
5654
57- public EventProcessor (Controller <R > controller , ResourceCache < R > resourceCache ) {
55+ EventProcessor (EventSourceManager <R > eventSourceManager ) {
5856 this (
59- resourceCache ,
57+ eventSourceManager . getControllerResourceEventSource () ,
6058 ExecutorServiceManager .instance ().executorService (),
61- controller .getConfiguration ().getName (),
62- new ReconciliationDispatcher <>(controller ),
63- GenericRetry .fromConfiguration (controller .getConfiguration ().getRetryConfiguration ()),
64- controller .getConfiguration ().getConfigurationService ().getMetrics (),
65- new EventMarker ());
59+ eventSourceManager .getController ().getConfiguration ().getName (),
60+ new ReconciliationDispatcher <>(eventSourceManager .getController ()),
61+ GenericRetry .fromConfiguration (
62+ eventSourceManager .getController ().getConfiguration ().getRetryConfiguration ()),
63+ eventSourceManager .getController ().getConfiguration ().getConfigurationService ()
64+ .getMetrics (),
65+ eventSourceManager );
6666 }
6767
6868 EventProcessor (ReconciliationDispatcher <R > reconciliationDispatcher ,
69- ResourceCache <R > resourceCache ,
69+ EventSourceManager <R > eventSourceManager ,
7070 String relatedControllerName ,
71- Retry retry , EventMarker eventMarker ) {
72- this (resourceCache , null , relatedControllerName , reconciliationDispatcher , retry , null ,
73- eventMarker );
71+ Retry retry ) {
72+ this (eventSourceManager . getControllerResourceEventSource () , null , relatedControllerName ,
73+ reconciliationDispatcher , retry , null , eventSourceManager );
7474 }
7575
7676 private EventProcessor (ResourceCache <R > resourceCache , ExecutorService executor ,
7777 String relatedControllerName ,
7878 ReconciliationDispatcher <R > reconciliationDispatcher , Retry retry , Metrics metrics ,
79- EventMarker eventMarker ) {
79+ EventSourceManager < R > eventSourceManager ) {
8080 this .running = true ;
8181 this .executor =
8282 executor == null
@@ -88,11 +88,12 @@ private EventProcessor(ResourceCache<R> resourceCache, ExecutorService executor,
8888 this .retry = retry ;
8989 this .resourceCache = resourceCache ;
9090 this .metrics = metrics != null ? metrics : Metrics .NOOP ;
91- this .eventMarker = eventMarker ;
91+ this .eventMarker = new EventMarker ();
92+ this .eventSourceManager = eventSourceManager ;
9293 }
9394
94- public void setEventSourceManager ( EventSourceManager < R > eventSourceManager ) {
95- this . eventSourceManager = eventSourceManager ;
95+ EventMarker getEventMarker ( ) {
96+ return eventMarker ;
9697 }
9798
9899 @ Override
@@ -243,9 +244,12 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> execution
243244
244245 private void reScheduleExecutionIfInstructed (PostExecutionControl <R > postExecutionControl ,
245246 R customResource ) {
246- postExecutionControl .getReScheduleDelay ().ifPresent (delay -> eventSourceManager
247- .getRetryAndRescheduleTimerEventSource ()
248- .scheduleOnce (customResource , delay ));
247+ postExecutionControl .getReScheduleDelay ()
248+ .ifPresent (delay -> retryEventSource ().scheduleOnce (customResource , delay ));
249+ }
250+
251+ TimerEventSource <R > retryEventSource () {
252+ return eventSourceManager .retryEventSource ();
249253 }
250254
251255 /**
@@ -275,9 +279,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope,
275279 delay ,
276280 customResourceID );
277281 metrics .failedReconciliation (customResourceID , exception );
278- eventSourceManager
279- .getRetryAndRescheduleTimerEventSource ()
280- .scheduleOnce (executionScope .getResource (), delay );
282+ retryEventSource ().scheduleOnce (executionScope .getResource (), delay );
281283 },
282284 () -> log .error ("Exhausted retries for {}" , executionScope ));
283285 }
@@ -289,9 +291,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
289291 if (isRetryConfigured ()) {
290292 retryState .remove (executionScope .getCustomResourceID ());
291293 }
292- eventSourceManager
293- .getRetryAndRescheduleTimerEventSource ()
294- .cancelOnceSchedule (executionScope .getCustomResourceID ());
294+ retryEventSource ().cancelOnceSchedule (executionScope .getCustomResourceID ());
295295 }
296296
297297 private RetryExecution getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
0 commit comments