3232
3333import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getName ;
3434
35- public class EventProcessor <R extends HasMetadata > implements EventHandler , LifecycleAware {
35+ public class EventProcessor <P extends HasMetadata > implements EventHandler , LifecycleAware {
3636
3737 private static final Logger log = LoggerFactory .getLogger (EventProcessor .class );
3838 private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50 ;
3939
4040 private volatile boolean running ;
4141 private final ControllerConfiguration <?> controllerConfiguration ;
42- private final ReconciliationDispatcher <R > reconciliationDispatcher ;
42+ private final ReconciliationDispatcher <P > reconciliationDispatcher ;
4343 private final Retry retry ;
4444 private final ExecutorService executor ;
4545 private final Metrics metrics ;
46- private final Cache <R > cache ;
47- private final EventSourceManager <R > eventSourceManager ;
46+ private final Cache <P > cache ;
47+ private final EventSourceManager <P > eventSourceManager ;
4848 private final RateLimiter <? extends RateLimitState > rateLimiter ;
4949 private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
5050 private final Map <String , Object > metricsMetadata ;
5151
5252
53- public EventProcessor (EventSourceManager <R > eventSourceManager ) {
53+ public EventProcessor (EventSourceManager <P > eventSourceManager ) {
5454 this (
5555 eventSourceManager .getController ().getConfiguration (),
5656 eventSourceManager .getControllerResourceEventSource (),
@@ -63,8 +63,8 @@ public EventProcessor(EventSourceManager<R> eventSourceManager) {
6363 @ SuppressWarnings ("rawtypes" )
6464 EventProcessor (
6565 ControllerConfiguration controllerConfiguration ,
66- ReconciliationDispatcher <R > reconciliationDispatcher ,
67- EventSourceManager <R > eventSourceManager ,
66+ ReconciliationDispatcher <P > reconciliationDispatcher ,
67+ EventSourceManager <P > eventSourceManager ,
6868 Metrics metrics ) {
6969 this (
7070 controllerConfiguration ,
@@ -78,11 +78,11 @@ public EventProcessor(EventSourceManager<R> eventSourceManager) {
7878 @ SuppressWarnings ({"rawtypes" , "unchecked" })
7979 private EventProcessor (
8080 ControllerConfiguration controllerConfiguration ,
81- Cache <R > cache ,
81+ Cache <P > cache ,
8282 ExecutorService executor ,
83- ReconciliationDispatcher <R > reconciliationDispatcher ,
83+ ReconciliationDispatcher <P > reconciliationDispatcher ,
8484 Metrics metrics ,
85- EventSourceManager <R > eventSourceManager ) {
85+ EventSourceManager <P > eventSourceManager ) {
8686 this .controllerConfiguration = controllerConfiguration ;
8787 this .running = false ;
8888 this .executor =
@@ -136,7 +136,7 @@ private void submitReconciliationExecution(ResourceState state) {
136136 try {
137137 boolean controllerUnderExecution = isControllerUnderExecution (state );
138138 final var resourceID = state .getId ();
139- Optional <R > maybeLatest = cache .get (resourceID );
139+ Optional <P > maybeLatest = cache .get (resourceID );
140140 maybeLatest .ifPresent (MDCUtils ::addResourceInfo );
141141 if (!controllerUnderExecution && maybeLatest .isPresent ()) {
142142 var rateLimit = state .getRateLimit ();
@@ -151,9 +151,9 @@ private void submitReconciliationExecution(ResourceState state) {
151151 }
152152 state .setUnderProcessing (true );
153153 final var latest = maybeLatest .get ();
154- ExecutionScope <R > executionScope = new ExecutionScope <>(latest , state .getRetry ());
154+ ExecutionScope <P > executionScope = new ExecutionScope <>(latest , state .getRetry ());
155155 state .unMarkEventReceived ();
156- metrics .reconcileCustomResource (resourceID , state .getRetry (), metricsMetadata );
156+ metrics .reconcileCustomResource (latest , state .getRetry (), metricsMetadata );
157157 log .debug ("Executing events for custom resource. Scope: {}" , executionScope );
158158 executor .execute (new ReconcilerExecutor (executionScope ));
159159 } else {
@@ -221,7 +221,7 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal
221221 }
222222
223223 synchronized void eventProcessingFinished (
224- ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
224+ ExecutionScope <P > executionScope , PostExecutionControl <P > postExecutionControl ) {
225225 if (!running ) {
226226 return ;
227227 }
@@ -244,7 +244,7 @@ synchronized void eventProcessingFinished(
244244 return ;
245245 }
246246 cleanupOnSuccessfulExecution (executionScope );
247- metrics .finishedReconciliation (resourceID , metricsMetadata );
247+ metrics .finishedReconciliation (executionScope . getResource () , metricsMetadata );
248248 if (state .deleteEventPresent ()) {
249249 cleanupForDeletedEvent (executionScope .getResourceID ());
250250 } else if (postExecutionControl .isFinalizerRemoved ()) {
@@ -253,12 +253,12 @@ synchronized void eventProcessingFinished(
253253 postExecutionControl
254254 .getUpdatedCustomResource ()
255255 .ifPresent (
256- r -> {
256+ p -> {
257257 if (!postExecutionControl .updateIsStatusPatch ()) {
258258 eventSourceManager
259259 .getControllerResourceEventSource ()
260260 .handleRecentResourceUpdate (
261- ResourceID .fromResource (r ), r , executionScope .getResource ());
261+ ResourceID .fromResource (p ), p , executionScope .getResource ());
262262 }
263263 });
264264 if (state .eventPresent ()) {
@@ -270,7 +270,7 @@ synchronized void eventProcessingFinished(
270270 }
271271
272272 private void reScheduleExecutionIfInstructed (
273- PostExecutionControl <R > postExecutionControl , R customResource ) {
273+ PostExecutionControl <P > postExecutionControl , P customResource ) {
274274
275275 postExecutionControl
276276 .getReScheduleDelay ()
@@ -281,7 +281,7 @@ private void reScheduleExecutionIfInstructed(
281281 }, () -> scheduleExecutionForMaxReconciliationInterval (customResource ));
282282 }
283283
284- private void scheduleExecutionForMaxReconciliationInterval (R customResource ) {
284+ private void scheduleExecutionForMaxReconciliationInterval (P customResource ) {
285285 this .controllerConfiguration
286286 .maxReconciliationInterval ()
287287 .ifPresent (m -> {
@@ -294,7 +294,7 @@ private void scheduleExecutionForMaxReconciliationInterval(R customResource) {
294294 });
295295 }
296296
297- TimerEventSource <R > retryEventSource () {
297+ TimerEventSource <P > retryEventSource () {
298298 return eventSourceManager .retryEventSource ();
299299 }
300300
@@ -304,7 +304,7 @@ TimerEventSource<R> retryEventSource() {
304304 * according to the retry timing if there was an exception.
305305 */
306306 private void handleRetryOnException (
307- ExecutionScope <R > executionScope , Exception exception ) {
307+ ExecutionScope <P > executionScope , Exception exception ) {
308308 final var state = getOrInitRetryExecution (executionScope );
309309 var resourceID = state .getId ();
310310 boolean eventPresent = state .eventPresent ();
@@ -323,7 +323,7 @@ private void handleRetryOnException(
323323 "Scheduling timer event for retry with delay:{} for resource: {}" ,
324324 delay ,
325325 resourceID );
326- metrics .failedReconciliation (resourceID , exception , metricsMetadata );
326+ metrics .failedReconciliation (executionScope . getResource () , exception , metricsMetadata );
327327 retryEventSource ().scheduleOnce (resourceID , delay );
328328 },
329329 () -> {
@@ -332,7 +332,7 @@ private void handleRetryOnException(
332332 });
333333 }
334334
335- private void cleanupOnSuccessfulExecution (ExecutionScope <R > executionScope ) {
335+ private void cleanupOnSuccessfulExecution (ExecutionScope <P > executionScope ) {
336336 log .debug (
337337 "Cleanup for successful execution for resource: {}" , getName (executionScope .getResource ()));
338338 if (isRetryConfigured ()) {
@@ -341,7 +341,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
341341 retryEventSource ().cancelOnceSchedule (executionScope .getResourceID ());
342342 }
343343
344- private ResourceState getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
344+ private ResourceState getOrInitRetryExecution (ExecutionScope <P > executionScope ) {
345345 final var state = resourceStateManager .getOrCreate (executionScope .getResourceID ());
346346 RetryExecution retryExecution = state .getRetry ();
347347 if (retryExecution == null ) {
@@ -387,9 +387,9 @@ private void handleAlreadyMarkedEvents() {
387387 }
388388
389389 private class ReconcilerExecutor implements Runnable {
390- private final ExecutionScope <R > executionScope ;
390+ private final ExecutionScope <P > executionScope ;
391391
392- private ReconcilerExecutor (ExecutionScope <R > executionScope ) {
392+ private ReconcilerExecutor (ExecutionScope <P > executionScope ) {
393393 this .executionScope = executionScope ;
394394 }
395395
@@ -401,7 +401,7 @@ public void run() {
401401 try {
402402 MDCUtils .addResourceInfo (executionScope .getResource ());
403403 thread .setName ("ReconcilerExecutor-" + controllerName () + "-" + thread .getId ());
404- PostExecutionControl <R > postExecutionControl =
404+ PostExecutionControl <P > postExecutionControl =
405405 reconciliationDispatcher .handleExecution (executionScope );
406406 eventProcessingFinished (executionScope , postExecutionControl );
407407 } finally {
0 commit comments