55import org .dataloader .annotations .ExperimentalApi ;
66
77import java .time .Duration ;
8- import java .util .HashMap ;
8+ import java .util .LinkedHashMap ;
99import java .util .Map ;
10+ import java .util .concurrent .ConcurrentHashMap ;
1011import java .util .concurrent .Executors ;
1112import java .util .concurrent .ScheduledExecutorService ;
1213import java .util .concurrent .TimeUnit ;
1314
1415import static org .dataloader .impl .Assertions .nonNull ;
1516
1617/**
17- * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
18- * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false,
19- * then a task is scheduled to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
18+ * This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called
19+ * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
20+ * to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
21+ * <p>
22+ * It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the
23+ * whole {@link ScheduledDataLoaderRegistry}.
24+ * <p>
25+ * This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
26+ * no rescheduling will occur, and you will need to call dispatch again to restart the process.
2027 * <p>
2128 * In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
2229 * no rescheduling will occur, and you will need to call dispatch again to restart the process.
3542 * <p>
3643 * When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job
3744 * on the {@link ScheduledExecutorService} that is continuously dispatching.
38- * <p>
39- * If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
45+ * <p> * If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
4046 * call {@link #rescheduleNow()}.
4147 * <p>
4248 * By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you
4854@ ExperimentalApi
4955public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {
5056
51- private final ScheduledExecutorService scheduledExecutorService ;
57+ private final Map < DataLoader <?, ?>, DispatchPredicate > dataLoaderPredicates = new ConcurrentHashMap <>() ;
5258 private final DispatchPredicate dispatchPredicate ;
59+ private final ScheduledExecutorService scheduledExecutorService ;
5360 private final Duration schedule ;
5461 private final boolean tickerMode ;
5562 private volatile boolean closed ;
5663
5764 private ScheduledDataLoaderRegistry (Builder builder ) {
65+ super ();
5866 this .dataLoaders .putAll (builder .dataLoaders );
5967 this .scheduledExecutorService = builder .scheduledExecutorService ;
60- this .dispatchPredicate = builder .dispatchPredicate ;
6168 this .schedule = builder .schedule ;
6269 this .tickerMode = builder .tickerMode ;
6370 this .closed = false ;
71+ this .dispatchPredicate = builder .dispatchPredicate ;
72+ this .dataLoaderPredicates .putAll (builder .dataLoaderPredicates );
6473 }
6574
6675 /**
@@ -85,6 +94,88 @@ public boolean isTickerMode() {
8594 return tickerMode ;
8695 }
8796
97+ /**
98+ * This will combine all the current data loaders in this registry and all the data loaders from the specified registry
99+ * and return a new combined registry
100+ *
101+ * @param registry the registry to combine into this registry
102+ *
103+ * @return a new combined registry
104+ */
105+ public ScheduledDataLoaderRegistry combine (DataLoaderRegistry registry ) {
106+ Builder combinedBuilder = ScheduledDataLoaderRegistry .newScheduledRegistry ()
107+ .dispatchPredicate (this .dispatchPredicate );
108+ combinedBuilder .registerAll (this );
109+ combinedBuilder .registerAll (registry );
110+ return combinedBuilder .build ();
111+ }
112+
113+
114+ /**
115+ * This will unregister a new dataloader
116+ *
117+ * @param key the key of the data loader to unregister
118+ *
119+ * @return this registry
120+ */
121+ public ScheduledDataLoaderRegistry unregister (String key ) {
122+ DataLoader <?, ?> dataLoader = dataLoaders .remove (key );
123+ if (dataLoader != null ) {
124+ dataLoaderPredicates .remove (dataLoader );
125+ }
126+ return this ;
127+ }
128+
129+ /**
130+ * @return a map of data loaders to specific dispatch predicates
131+ */
132+ public Map <DataLoader <?, ?>, DispatchPredicate > getDataLoaderPredicates () {
133+ return new LinkedHashMap <>(dataLoaderPredicates );
134+ }
135+
136+ /**
137+ * There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry}
138+ *
139+ * @return the default dispatch predicate
140+ */
141+ public DispatchPredicate getDispatchPredicate () {
142+ return dispatchPredicate ;
143+ }
144+
145+ /**
146+ * This will register a new dataloader and dispatch predicate associated with that data loader
147+ *
148+ * @param key the key to put the data loader under
149+ * @param dataLoader the data loader to register
150+ * @param dispatchPredicate the dispatch predicate to associate with this data loader
151+ *
152+ * @return this registry
153+ */
154+ public ScheduledDataLoaderRegistry register (String key , DataLoader <?, ?> dataLoader , DispatchPredicate dispatchPredicate ) {
155+ dataLoaders .put (key , dataLoader );
156+ dataLoaderPredicates .put (dataLoader , dispatchPredicate );
157+ return this ;
158+ }
159+
160+ /**
161+ * Returns true if the dataloader has a predicate which returned true, OR the overall
162+ * registry predicate returned true.
163+ *
164+ * @param dataLoaderKey the key in the dataloader map
165+ * @param dataLoader the dataloader
166+ *
167+ * @return true if it should dispatch
168+ */
169+ private boolean shouldDispatch (String dataLoaderKey , DataLoader <?, ?> dataLoader ) {
170+ DispatchPredicate dispatchPredicate = dataLoaderPredicates .get (dataLoader );
171+ if (dispatchPredicate != null ) {
172+ if (dispatchPredicate .test (dataLoaderKey , dataLoader )) {
173+ return true ;
174+ }
175+ }
176+ return this .dispatchPredicate .test (dataLoaderKey , dataLoader );
177+ }
178+
88179 @ Override
89180 public void dispatchAll () {
90181 dispatchAllWithCount ();
@@ -101,24 +192,28 @@ public int dispatchAllWithCount() {
101192 return sum ;
102193 }
103194
195+
104196 /**
105197 * This will immediately dispatch the {@link DataLoader}s in the registry
106- * without testing the predicate
198+ * without testing the predicates
107199 */
108200 public void dispatchAllImmediately () {
109- super . dispatchAll ();
201+ dispatchAllWithCountImmediately ();
110202 }
111203
112204 /**
113205 * This will immediately dispatch the {@link DataLoader}s in the registry
114- * without testing the predicate
206+ * without testing the predicates
115207 *
116208 * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
117209 */
118210 public int dispatchAllWithCountImmediately () {
119- return super .dispatchAllWithCount ();
211+ return dataLoaders .values ().stream ()
212+ .mapToInt (dataLoader -> dataLoader .dispatchWithCounts ().getKeysCount ())
213+ .sum ();
120214 }
121215
216+
122217 /**
123218 * This will schedule a task to check the predicate and dispatch if true right now. It will not do
124219 * a pre check of the preodicate like {@link #dispatchAll()} would
@@ -136,7 +231,7 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) {
136231
137232 private int dispatchOrReschedule (String key , DataLoader <?, ?> dataLoader ) {
138233 int sum = 0 ;
139- boolean shouldDispatch = dispatchPredicate . test (key , dataLoader );
234+ boolean shouldDispatch = shouldDispatch (key , dataLoader );
140235 if (shouldDispatch ) {
141236 sum = dataLoader .dispatchWithCounts ().getKeysCount ();
142237 }
@@ -147,8 +242,8 @@ private int dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
147242 }
148243
149244 /**
150- * By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
151- * and a schedule duration of 10 milli seconds .
245+ * By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
246+ * and a schedule duration of 10 milliseconds .
152247 *
153248 * @return A builder of {@link ScheduledDataLoaderRegistry}s
154249 */
@@ -158,10 +253,11 @@ public static Builder newScheduledRegistry() {
158253
159254 public static class Builder {
160255
256+ private final Map <String , DataLoader <?, ?>> dataLoaders = new LinkedHashMap <>();
257+ private final Map <DataLoader <?, ?>, DispatchPredicate > dataLoaderPredicates = new LinkedHashMap <>();
258+ private DispatchPredicate dispatchPredicate = DispatchPredicate .DISPATCH_ALWAYS ;
161259 private ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor ();
162- private DispatchPredicate dispatchPredicate = (key , dl ) -> true ;
163260 private Duration schedule = Duration .ofMillis (10 );
164- private final Map <String , DataLoader <?, ?>> dataLoaders = new HashMap <>();
165261 private boolean tickerMode = false ;
166262
167263 public Builder scheduledExecutorService (ScheduledExecutorService executorService ) {
@@ -174,11 +270,6 @@ public Builder schedule(Duration schedule) {
174270 return this ;
175271 }
176272
177- public Builder dispatchPredicate (DispatchPredicate dispatchPredicate ) {
178- this .dispatchPredicate = nonNull (dispatchPredicate );
179- return this ;
180- }
181-
182273 /**
183274 * This will register a new dataloader
184275 *
@@ -192,8 +283,24 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
192283 return this ;
193284 }
194285
286+
287+ /**
288+ * This will register a new dataloader with a specific {@link DispatchPredicate}
289+ *
290+ * @param key the key to put the data loader under
291+ * @param dataLoader the data loader to register
292+ * @param dispatchPredicate the dispatch predicate
293+ *
294+ * @return this builder for a fluent pattern
295+ */
296+ public Builder register (String key , DataLoader <?, ?> dataLoader , DispatchPredicate dispatchPredicate ) {
297+ register (key , dataLoader );
298+ dataLoaderPredicates .put (dataLoader , dispatchPredicate );
299+ return this ;
300+ }
301+
195302 /**
196- * This will combine together the data loaders in this builder with the ones
303+ * This will combine the data loaders in this builder with the ones
197304 * from a previous {@link DataLoaderRegistry}
198305 *
199306 * @param otherRegistry the previous {@link DataLoaderRegistry}
@@ -202,6 +309,23 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
202309 */
203310 public Builder registerAll (DataLoaderRegistry otherRegistry ) {
204311 dataLoaders .putAll (otherRegistry .getDataLoadersMap ());
312+ if (otherRegistry instanceof ScheduledDataLoaderRegistry ) {
313+ ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry ) otherRegistry ;
314+ dataLoaderPredicates .putAll (other .dataLoaderPredicates );
315+ }
316+ return this ;
317+ }
318+
319+ /**
320+ * This sets a default predicate on the {@link DataLoaderRegistry} that will control
321+ * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
322+ *
323+ * @param dispatchPredicate the predicate
324+ *
325+ * @return this builder for a fluent pattern
326+ */
327+ public Builder dispatchPredicate (DispatchPredicate dispatchPredicate ) {
328+ this .dispatchPredicate = dispatchPredicate ;
205329 return this ;
206330 }
207331
0 commit comments