1919import org .dataloader .impl .CompletableFutureKit ;
2020
2121import java .util .ArrayList ;
22- import java .util .Collections ;
22+ import java .util .Collection ;
2323import java .util .LinkedHashMap ;
2424import java .util .List ;
2525import java .util .Map ;
2626import java .util .concurrent .CompletableFuture ;
2727import java .util .stream .Collectors ;
2828
2929import static java .util .Collections .emptyList ;
30+ import static java .util .Collections .singletonList ;
3031import static org .dataloader .impl .Assertions .assertState ;
3132import static org .dataloader .impl .Assertions .nonNull ;
3233
@@ -98,8 +99,10 @@ private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptio
9899 */
99100 public CompletableFuture <V > load (K key ) {
100101 Object cacheKey = getCacheKey (nonNull (key ));
101- if (loaderOptions .cachingEnabled () && futureCache .containsKey (cacheKey )) {
102- return futureCache .get (cacheKey );
102+ synchronized (futureCache ) {
103+ if (loaderOptions .cachingEnabled () && futureCache .containsKey (cacheKey )) {
104+ return futureCache .get (cacheKey );
105+ }
103106 }
104107
105108 CompletableFuture <V > future = new CompletableFuture <>();
@@ -108,15 +111,17 @@ public CompletableFuture<V> load(K key) {
108111 loaderQueue .put (key , future );
109112 }
110113 } else {
111- // immediate execution of batch function (but not promise itself)
114+ // immediate execution of batch function
112115 CompletableFuture <List <V >> batchedLoad = batchLoadFunction
113- .load (Collections . singletonList (key ))
116+ .load (singletonList (key ))
114117 .toCompletableFuture ();
115118 future = batchedLoad
116119 .thenApply (list -> list .get (0 ));
117120 }
118121 if (loaderOptions .cachingEnabled ()) {
119- futureCache .set (cacheKey , future );
122+ synchronized (futureCache ) {
123+ futureCache .set (cacheKey , future );
124+ }
120125 }
121126 return future ;
122127 }
@@ -177,25 +182,64 @@ public CompletableFuture<List<V>> dispatch() {
177182 // the previously cached future objects that the client already has been given
178183 // via calls to load("foo") and loadMany(["foo","bar"])
179184 //
185+ int maxBatchSize = loaderOptions .maxBatchSize ();
186+ if (maxBatchSize > 0 && maxBatchSize < keys .size ()) {
187+ return sliceIntoBatchesOfBatches (keys , queuedFutures , maxBatchSize );
188+ } else {
189+ return dispatchQueueBatch (keys , queuedFutures );
190+ }
191+ }
192+
193+ private CompletableFuture <List <V >> sliceIntoBatchesOfBatches (List <K > keys , List <CompletableFuture <V >> queuedFutures , int maxBatchSize ) {
194+ // the number of keys is > than what the batch loader function can accept
195+ // so make multiple calls to the loader
196+ List <CompletableFuture <List <V >>> allBatches = new ArrayList <>();
197+ int len = keys .size ();
198+ int batchCount = (int ) Math .ceil (len / (double ) maxBatchSize );
199+ for (int i = 0 ; i < batchCount ; i ++) {
200+
201+ int fromIndex = i * maxBatchSize ;
202+ int toIndex = Math .min ((i + 1 ) * maxBatchSize , len );
203+
204+ List <K > subKeys = keys .subList (fromIndex , toIndex );
205+ List <CompletableFuture <V >> subFutures = queuedFutures .subList (fromIndex , toIndex );
206+
207+ allBatches .add (dispatchQueueBatch (subKeys , subFutures ));
208+ }
209+ //
210+ // now reassemble all the futures into one that is the complete set of results
211+ return CompletableFuture .allOf (allBatches .toArray (new CompletableFuture [allBatches .size ()]))
212+ .thenApply (v -> allBatches .stream ()
213+ .map (CompletableFuture ::join )
214+ .flatMap (Collection ::stream )
215+ .collect (Collectors .toList ()));
216+ }
217+
218+ private CompletableFuture <List <V >> dispatchQueueBatch (List <K > keys , List <CompletableFuture <V >> queuedFutures ) {
180219 return batchLoadFunction .load (keys )
181220 .toCompletableFuture ()
182221 .thenApply (values -> {
183222 assertState (keys .size () == values .size (), "The size of the promised values MUST be the same size as the key list" );
184223
185224 for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
186- V value = values .get (idx );
225+ Object value = values .get (idx );
187226 CompletableFuture <V > future = queuedFutures .get (idx );
188- future .complete (value );
227+ if (value instanceof Throwable ) {
228+ future .completeExceptionally ((Throwable ) value );
229+ } else {
230+ @ SuppressWarnings ("unchecked" )
231+ V val = (V ) value ;
232+ future .complete (val );
233+ }
189234 }
190235 return values ;
191236 }).exceptionally (ex -> {
192237 for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
193238 K key = keys .get (idx );
194239 CompletableFuture <V > future = queuedFutures .get (idx );
195- // clear any cached view of this key
196- futureCache .delete (key );
197240 future .completeExceptionally (ex );
198-
241+ // clear any cached view of this key
242+ clear (key );
199243 }
200244 return emptyList ();
201245 });
@@ -242,7 +286,9 @@ public int dispatchDepth() {
242286 */
243287 public DataLoader <K , V > clear (K key ) {
244288 Object cacheKey = getCacheKey (key );
245- futureCache .delete (cacheKey );
289+ synchronized (futureCache ) {
290+ futureCache .delete (cacheKey );
291+ }
246292 return this ;
247293 }
248294
@@ -252,7 +298,9 @@ public DataLoader<K, V> clear(K key) {
252298 * @return the data loader for fluent coding
253299 */
254300 public DataLoader <K , V > clearAll () {
255- futureCache .clear ();
301+ synchronized (futureCache ) {
302+ futureCache .clear ();
303+ }
256304 return this ;
257305 }
258306
@@ -266,8 +314,10 @@ public DataLoader<K, V> clearAll() {
266314 */
267315 public DataLoader <K , V > prime (K key , V value ) {
268316 Object cacheKey = getCacheKey (key );
269- if (!futureCache .containsKey (cacheKey )) {
270- futureCache .set (cacheKey , CompletableFuture .completedFuture (value ));
317+ synchronized (futureCache ) {
318+ if (!futureCache .containsKey (cacheKey )) {
319+ futureCache .set (cacheKey , CompletableFuture .completedFuture (value ));
320+ }
271321 }
272322 return this ;
273323 }
0 commit comments