55import org .dataloader .annotations .ExperimentalApi ;
66
77import java .time .Duration ;
8- import java .util .HashMap ;
8+ import java .util .LinkedHashMap ;
99import java .util .Map ;
10+ import java .util .concurrent .ConcurrentHashMap ;
1011import java .util .concurrent .Executors ;
1112import java .util .concurrent .ScheduledExecutorService ;
1213import java .util .concurrent .TimeUnit ;
1314
1415import static org .dataloader .impl .Assertions .nonNull ;
1516
1617/**
17- * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
18+ * This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called
1819 * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
1920 * to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
2021 * <p>
22+ * It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the
23+ * whole {@link ScheduledDataLoaderRegistry}.
24+ * <p>
2125 * This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
22- * no rescheduling will occur and you will need to call dispatch again to restart the process.
26+ * no rescheduling will occur, and you will need to call dispatch again to restart the process.
2327 * <p>
2428 * If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
2529 * call {@link #rescheduleNow()}.
2933@ ExperimentalApi
3034public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {
3135
32- private final ScheduledExecutorService scheduledExecutorService ;
36+ private final Map < DataLoader <?, ?>, DispatchPredicate > dataLoaderPredicates = new ConcurrentHashMap <>() ;
3337 private final DispatchPredicate dispatchPredicate ;
38+ private final ScheduledExecutorService scheduledExecutorService ;
3439 private final Duration schedule ;
3540 private volatile boolean closed ;
3641
3742 private ScheduledDataLoaderRegistry (Builder builder ) {
43+ super ();
3844 this .dataLoaders .putAll (builder .dataLoaders );
3945 this .scheduledExecutorService = builder .scheduledExecutorService ;
40- this .dispatchPredicate = builder .dispatchPredicate ;
4146 this .schedule = builder .schedule ;
4247 this .closed = false ;
48+ this .dispatchPredicate = builder .dispatchPredicate ;
49+ this .dataLoaderPredicates .putAll (builder .dataLoaderPredicates );
4350 }
4451
4552 /**
@@ -57,6 +64,88 @@ public Duration getScheduleDuration() {
5764 return schedule ;
5865 }
5966
67+ /**
68+ * This will combine all the current data loaders in this registry and all the data loaders from the specified registry
69+ * and return a new combined registry
70+ *
71+ * @param registry the registry to combine into this registry
72+ *
73+ * @return a new combined registry
74+ */
75+ public ScheduledDataLoaderRegistry combine (DataLoaderRegistry registry ) {
76+ Builder combinedBuilder = ScheduledDataLoaderRegistry .newScheduledRegistry ()
77+ .dispatchPredicate (this .dispatchPredicate );
78+ combinedBuilder .registerAll (this );
79+ combinedBuilder .registerAll (registry );
80+ return combinedBuilder .build ();
81+ }
82+
83+
84+ /**
85+ * This will unregister a new dataloader
86+ *
87+ * @param key the key of the data loader to unregister
88+ *
89+ * @return this registry
90+ */
91+ public ScheduledDataLoaderRegistry unregister (String key ) {
92+ DataLoader <?, ?> dataLoader = dataLoaders .remove (key );
93+ if (dataLoader != null ) {
94+ dataLoaderPredicates .remove (dataLoader );
95+ }
96+ return this ;
97+ }
98+
99+ /**
100+ * @return a map of data loaders to specific dispatch predicates
101+ */
102+ public Map <DataLoader <?, ?>, DispatchPredicate > getDataLoaderPredicates () {
103+ return new LinkedHashMap <>(dataLoaderPredicates );
104+ }
105+
106+ /**
107+ * There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry}
108+ *
109+ * @return the default dispatch predicate
110+ */
111+ public DispatchPredicate getDispatchPredicate () {
112+ return dispatchPredicate ;
113+ }
114+
115+ /**
116+ * This will register a new dataloader and dispatch predicate associated with that data loader
117+ *
118+ * @param key the key to put the data loader under
119+ * @param dataLoader the data loader to register
120+ * @param dispatchPredicate the dispatch predicate to associate with this data loader
121+ *
122+ * @return this registry
123+ */
124+ public ScheduledDataLoaderRegistry register (String key , DataLoader <?, ?> dataLoader , DispatchPredicate dispatchPredicate ) {
125+ dataLoaders .put (key , dataLoader );
126+ dataLoaderPredicates .put (dataLoader , dispatchPredicate );
127+ return this ;
128+ }
129+
130+ /**
131+ * Returns true if the dataloader has a predicate which returned true, OR the overall
132+ * registry predicate returned true.
133+ *
134+ * @param dataLoaderKey the key in the dataloader map
135+ * @param dataLoader the dataloader
136+ *
137+ * @return true if it should dispatch
138+ */
139+ private boolean shouldDispatch (String dataLoaderKey , DataLoader <?, ?> dataLoader ) {
140+ DispatchPredicate dispatchPredicate = dataLoaderPredicates .get (dataLoader );
141+ if (dispatchPredicate != null ) {
142+ if (dispatchPredicate .test (dataLoaderKey , dataLoader )) {
143+ return true ;
144+ }
145+ }
146+ return this .dispatchPredicate .test (dataLoaderKey , dataLoader );
147+ }
148+
60149 @ Override
61150 public void dispatchAll () {
62151 dispatchAllWithCount ();
@@ -68,7 +157,7 @@ public int dispatchAllWithCount() {
68157 for (Map .Entry <String , DataLoader <?, ?>> entry : dataLoaders .entrySet ()) {
69158 DataLoader <?, ?> dataLoader = entry .getValue ();
70159 String key = entry .getKey ();
71- if (dispatchPredicate . test (key , dataLoader )) {
160+ if (shouldDispatch (key , dataLoader )) {
72161 sum += dataLoader .dispatchWithCounts ().getKeysCount ();
73162 } else {
74163 reschedule (key , dataLoader );
@@ -77,24 +166,28 @@ public int dispatchAllWithCount() {
77166 return sum ;
78167 }
79168
169+
80170 /**
81171 * This will immediately dispatch the {@link DataLoader}s in the registry
82- * without testing the predicate
172+ * without testing the predicates
83173 */
84174 public void dispatchAllImmediately () {
85- super . dispatchAll ();
175+ dispatchAllWithCountImmediately ();
86176 }
87177
88178 /**
89179 * This will immediately dispatch the {@link DataLoader}s in the registry
90- * without testing the predicate
180+ * without testing the predicates
91181 *
92182 * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
93183 */
94184 public int dispatchAllWithCountImmediately () {
95- return super .dispatchAllWithCount ();
185+ return dataLoaders .values ().stream ()
186+ .mapToInt (dataLoader -> dataLoader .dispatchWithCounts ().getKeysCount ())
187+ .sum ();
96188 }
97189
190+
98191 /**
99192 * This will schedule a task to check the predicate and dispatch if true right now. It will not do
100193 * a pre check of the preodicate like {@link #dispatchAll()} would
@@ -111,16 +204,16 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) {
111204 }
112205
113206 private void dispatchOrReschedule (String key , DataLoader <?, ?> dataLoader ) {
114- if (dispatchPredicate . test (key , dataLoader )) {
207+ if (shouldDispatch (key , dataLoader )) {
115208 dataLoader .dispatch ();
116209 } else {
117210 reschedule (key , dataLoader );
118211 }
119212 }
120213
121214 /**
122- * By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
123- * and a schedule duration of 10 milli seconds .
215+ * By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
216+ * and a schedule duration of 10 milliseconds .
124217 *
125218 * @return A builder of {@link ScheduledDataLoaderRegistry}s
126219 */
@@ -130,10 +223,11 @@ public static Builder newScheduledRegistry() {
130223
131224 public static class Builder {
132225
226+ private final Map <String , DataLoader <?, ?>> dataLoaders = new LinkedHashMap <>();
227+ private final Map <DataLoader <?, ?>, DispatchPredicate > dataLoaderPredicates = new LinkedHashMap <>();
228+ private DispatchPredicate dispatchPredicate = DispatchPredicate .DISPATCH_ALWAYS ;
133229 private ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor ();
134- private DispatchPredicate dispatchPredicate = (key , dl ) -> true ;
135230 private Duration schedule = Duration .ofMillis (10 );
136- private final Map <String , DataLoader <?, ?>> dataLoaders = new HashMap <>();
137231
138232 public Builder scheduledExecutorService (ScheduledExecutorService executorService ) {
139233 this .scheduledExecutorService = nonNull (executorService );
@@ -145,11 +239,6 @@ public Builder schedule(Duration schedule) {
145239 return this ;
146240 }
147241
148- public Builder dispatchPredicate (DispatchPredicate dispatchPredicate ) {
149- this .dispatchPredicate = nonNull (dispatchPredicate );
150- return this ;
151- }
152-
153242 /**
154243 * This will register a new dataloader
155244 *
@@ -163,8 +252,24 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
163252 return this ;
164253 }
165254
255+
166256 /**
167- * This will combine together the data loaders in this builder with the ones
257+ * This will register a new dataloader with a specific {@link DispatchPredicate}
258+ *
259+ * @param key the key to put the data loader under
260+ * @param dataLoader the data loader to register
261+ * @param dispatchPredicate the dispatch predicate
262+ *
263+ * @return this builder for a fluent pattern
264+ */
265+ public Builder register (String key , DataLoader <?, ?> dataLoader , DispatchPredicate dispatchPredicate ) {
266+ register (key , dataLoader );
267+ dataLoaderPredicates .put (dataLoader , dispatchPredicate );
268+ return this ;
269+ }
270+
271+ /**
272+ * This will combine the data loaders in this builder with the ones
168273 * from a previous {@link DataLoaderRegistry}
169274 *
170275 * @param otherRegistry the previous {@link DataLoaderRegistry}
@@ -173,6 +278,23 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
173278 */
174279 public Builder registerAll (DataLoaderRegistry otherRegistry ) {
175280 dataLoaders .putAll (otherRegistry .getDataLoadersMap ());
281+ if (otherRegistry instanceof ScheduledDataLoaderRegistry ) {
282+ ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry ) otherRegistry ;
283+ dataLoaderPredicates .putAll (other .dataLoaderPredicates );
284+ }
285+ return this ;
286+ }
287+
288+ /**
289+ * This sets a default predicate on the {@link DataLoaderRegistry} that will control
290+ * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
291+ *
292+ * @param dispatchPredicate the predicate
293+ *
294+ * @return this builder for a fluent pattern
295+ */
296+ public Builder dispatchPredicate (DispatchPredicate dispatchPredicate ) {
297+ this .dispatchPredicate = dispatchPredicate ;
176298 return this ;
177299 }
178300
0 commit comments