11package io .javaoperatorsdk .operator .processing .event .source .polling ;
22
3+ import java .time .Duration ;
34import java .util .*;
4- import java .util .concurrent .ConcurrentHashMap ;
5+ import java .util .concurrent .* ;
56import java .util .function .Predicate ;
67
78import org .slf4j .Logger ;
@@ -32,8 +33,10 @@ public class PerResourcePollingEventSource<R, P extends HasMetadata>
3233
3334 private static final Logger log = LoggerFactory .getLogger (PerResourcePollingEventSource .class );
3435
35- private final Timer timer = new Timer ();
36- private final Map <ResourceID , TimerTask > timerTasks = new ConcurrentHashMap <>();
36+ public static final int DEFAULT_EXECUTOR_THREAD_NUMBER = 1 ;
37+
38+ private final ScheduledExecutorService executorService ;
39+ private final Map <ResourceID , ScheduledFuture <Void >> scheduledFutures = new ConcurrentHashMap <>();
3740 private final ResourceFetcher <R , P > resourceFetcher ;
3841 private final Cache <P > resourceCache ;
3942 private final Predicate <P > registerPredicate ;
@@ -57,11 +60,20 @@ public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
5760 Cache <P > resourceCache , long period ,
5861 Predicate <P > registerPredicate , Class <R > resourceClass ,
5962 CacheKeyMapper <R > cacheKeyMapper ) {
63+ this (resourceFetcher , resourceCache , period , registerPredicate , resourceClass , cacheKeyMapper ,
64+ new ScheduledThreadPoolExecutor (DEFAULT_EXECUTOR_THREAD_NUMBER ));
65+ }
66+
67+ public PerResourcePollingEventSource (ResourceFetcher <R , P > resourceFetcher ,
68+ Cache <P > resourceCache , long period ,
69+ Predicate <P > registerPredicate , Class <R > resourceClass ,
70+ CacheKeyMapper <R > cacheKeyMapper , ScheduledExecutorService executorService ) {
6071 super (resourceClass , cacheKeyMapper );
6172 this .resourceFetcher = resourceFetcher ;
6273 this .resourceCache = resourceCache ;
6374 this .period = period ;
6475 this .registerPredicate = registerPredicate ;
76+ this .executorService = executorService ;
6577 }
6678
6779 private Set <R > getAndCacheResource (P primary , boolean fromGetter ) {
@@ -71,6 +83,17 @@ private Set<R> getAndCacheResource(P primary, boolean fromGetter) {
7183 return values ;
7284 }
7385
86+ @ SuppressWarnings ("unchecked" )
87+ private void scheduleNextExecution (P primary , Set <R > actualResources ) {
88+ var primaryID = ResourceID .fromResource (primary );
89+ var fetchDelay = resourceFetcher .fetchDelay (actualResources , primary );
90+ var fetchDuration = fetchDelay .orElse (Duration .ofMillis (period ));
91+
92+ ScheduledFuture <Void > scheduledFuture = (ScheduledFuture <Void >) executorService
93+ .schedule (new FetchingExecutor (primaryID ), fetchDuration .toMillis (), TimeUnit .MILLISECONDS );
94+ scheduledFutures .put (primaryID , scheduledFuture );
95+ }
96+
7497 @ Override
7598 public void onResourceCreated (P resource ) {
7699 checkAndRegisterTask (resource );
@@ -84,41 +107,53 @@ public void onResourceUpdated(P newResource, P oldResource) {
84107 @ Override
85108 public void onResourceDeleted (P resource ) {
86109 var resourceID = ResourceID .fromResource (resource );
87- TimerTask task = timerTasks .remove (resourceID );
88- if (task != null ) {
89- log .debug ("Canceling task for resource: {}" , resource );
90- task .cancel ();
110+ var scheduledFuture = scheduledFutures .remove (resourceID );
111+ if (scheduledFuture != null ) {
112+ log .debug ("Canceling scheduledFuture for resource: {}" , resource );
113+ scheduledFuture .cancel (true );
91114 }
92115 handleDelete (resourceID );
93116 fetchedForPrimaries .remove (resourceID );
94117 }
95118
96119 // This method is always called from the same Thread for the same resource,
97120 // since events from ResourceEventAware are propagated from the thread of the informer. This is
98- // important
99- // because otherwise there will be a race condition related to the timerTasks.
121+ // important because otherwise there will be a race condition related to the timerTasks.
100122 private void checkAndRegisterTask (P resource ) {
101123 var primaryID = ResourceID .fromResource (resource );
102- if (timerTasks .get (primaryID ) == null && (registerPredicate == null
124+ if (scheduledFutures .get (primaryID ) == null && (registerPredicate == null
103125 || registerPredicate .test (resource ))) {
104- var task =
105- new TimerTask () {
106- @ Override
107- public void run () {
108- if (!isRunning ()) {
109- log .debug ("Event source not yet started. Will not run for: {}" , primaryID );
110- return ;
111- }
112- // always use up-to-date resource from cache
113- var res = resourceCache .get (primaryID );
114- res .ifPresentOrElse (p -> getAndCacheResource (p , false ),
115- () -> log .warn ("No resource in cache for resource ID: {}" , primaryID ));
116- }
117- };
118- timerTasks .put (primaryID , task );
119- // there is a delay, to not do two fetches when the resources first appeared
126+ var cachedResources = cache .get (primaryID );
127+ var actualResources =
128+ cachedResources == null ? null : new HashSet <>(cachedResources .values ());
129+ // note that there is a delay, to not do two fetches when the resources first appeared
120130 // and getSecondaryResource is called on reconciliation.
121- timer .schedule (task , period , period );
131+ scheduleNextExecution (resource , actualResources );
132+ }
133+ }
134+
135+ private class FetchingExecutor implements Runnable {
136+ private final ResourceID primaryID ;
137+
138+ public FetchingExecutor (ResourceID primaryID ) {
139+ this .primaryID = primaryID ;
140+ }
141+
142+ @ Override
143+ public void run () {
144+ if (!isRunning ()) {
145+ log .debug ("Event source not yet started. Will not run for: {}" , primaryID );
146+ return ;
147+ }
148+ // always use up-to-date resource from cache
149+ var primary = resourceCache .get (primaryID );
150+ if (primary .isEmpty ()) {
151+ log .warn ("No resource in cache for resource ID: {}" , primaryID );
152+ // no new execution is scheduled in this case, a on delete event should be received shortly
153+ } else {
154+ var actualResources = primary .map (p -> getAndCacheResource (p , false ));
155+ scheduleNextExecution (primary .get (), actualResources .orElse (null ));
156+ }
122157 }
123158 }
124159
@@ -146,12 +181,28 @@ public Set<R> getSecondaryResources(P primary) {
146181
147182 public interface ResourceFetcher <R , P > {
148183 Set <R > fetchResources (P primaryResource );
184+
185+ /**
186+ * By implementing this method it is possible to specify dynamic durations to wait between the
187+ * polls of the resources. This is especially handy if a resources "stabilized" so it is not
188+ * expected to change its state frequently. For example an AWS RDS instance is up and running,
189+ * it is expected to run and be stable for a very long time. In this case it is enough to poll
190+ * with a lower frequency, compared to the phase when it is being initialized.
191+ *
192+ * @param lastFetchedResource might be null, in case no fetch happened before. Empty set if
193+ * fetch happened but no resources were found.
194+ * @param primary related primary resource
195+ * @return an Optional containing the Duration to wait until the next fetch. If an empty
196+ * Optional is returned, the default polling period will be used.
197+ */
198+ default Optional <Duration > fetchDelay (Set <R > lastFetchedResource , P primary ) {
199+ return Optional .empty ();
200+ }
149201 }
150202
151203 @ Override
152204 public void stop () throws OperatorException {
153205 super .stop ();
154- timer . cancel ();
206+ executorService . shutdownNow ();
155207 }
156-
157208}
0 commit comments