1616
1717package org .dataloader ;
1818
19- import org .dataloader .impl .FutureKit ;
19+ import org .dataloader .impl .CompletableFutureKit ;
2020
2121import java .util .ArrayList ;
2222import java .util .Collections ;
2323import java .util .LinkedHashMap ;
2424import java .util .List ;
25- import java .util .Objects ;
25+ import java .util .Map ;
2626import java .util .concurrent .CompletableFuture ;
27- import java .util .concurrent .atomic .AtomicInteger ;
2827import java .util .stream .Collectors ;
2928
3029import static org .dataloader .impl .Assertions .assertState ;
30+ import static org .dataloader .impl .Assertions .nonNull ;
3131
3232/**
3333 * Data loader is a utility class that allows batch loading of data that is identified by a set of unique keys. For
@@ -54,8 +54,7 @@ public class DataLoader<K, V> {
5454 private final BatchLoader <K , V > batchLoadFunction ;
5555 private final DataLoaderOptions loaderOptions ;
5656 private final CacheMap <Object , CompletableFuture <V >> futureCache ;
57- private final LinkedHashMap <K , CompletableFuture <V >> loaderQueue ;
58- private final LinkedHashMap <PromisedValues , LinkedHashMap <K , CompletableFuture <V >>> dispatchedQueues ;
57+ private final Map <K , CompletableFuture <V >> loaderQueue ;
5958
6059 /**
6160 * Creates a new data loader with the provided batch load function, and default options.
@@ -72,14 +71,17 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction) {
7271 * @param batchLoadFunction the batch load function to use
7372 * @param options the batch load options
7473 */
75- @ SuppressWarnings ("unchecked" )
7674 public DataLoader (BatchLoader <K , V > batchLoadFunction , DataLoaderOptions options ) {
77- Objects .requireNonNull (batchLoadFunction , "Batch load function cannot be null" );
78- this .batchLoadFunction = batchLoadFunction ;
75+ this .batchLoadFunction = nonNull (batchLoadFunction );
7976 this .loaderOptions = options == null ? new DataLoaderOptions () : options ;
80- this .futureCache = loaderOptions .cacheMap ().isPresent () ? (CacheMap <Object , CompletableFuture <V >>) loaderOptions .cacheMap ().get () : CacheMap .simpleMap ();
81- this .loaderQueue = new LinkedHashMap <>();
82- this .dispatchedQueues = new LinkedHashMap <>();
77+ this .futureCache = determineCacheMap (loaderOptions );
78+ // order of keys matter in data loader
79+ this .loaderQueue = Collections .synchronizedMap (new LinkedHashMap <>());
80+ }
81+
82+ @ SuppressWarnings ("unchecked" )
83+ private CacheMap <Object , CompletableFuture <V >> determineCacheMap (DataLoaderOptions loaderOptions ) {
84+ return loaderOptions .cacheMap ().isPresent () ? (CacheMap <Object , CompletableFuture <V >>) loaderOptions .cacheMap ().get () : CacheMap .simpleMap ();
8385 }
8486
8587 /**
@@ -94,13 +96,12 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
9496 * @return the future of the value
9597 */
9698 public CompletableFuture <V > load (K key ) {
97- Objects .requireNonNull (key , "Key cannot be null" );
98- Object cacheKey = getCacheKey (key );
99+ Object cacheKey = getCacheKey (nonNull (key ));
99100 if (loaderOptions .cachingEnabled () && futureCache .containsKey (cacheKey )) {
100101 return futureCache .get (cacheKey );
101102 }
102103
103- CompletableFuture <V > future = FutureKit . future ();
104+ CompletableFuture <V > future = new CompletableFuture <> ();
104105 if (loaderOptions .batchingEnabled ()) {
105106 loaderQueue .put (key , future );
106107 } else {
@@ -130,7 +131,9 @@ public CompletableFuture<V> load(K key) {
130131 * @return the composite future of the list of values
131132 */
132133 public PromisedValues <V > loadMany (List <K > keys ) {
133- return PromisedValues .allOf (keys .stream ().map (this ::load ).collect (Collectors .toList ()));
134+ return PromisedValues .allOf (keys .stream ()
135+ .map (this ::load )
136+ .collect (Collectors .toList ()));
134137 }
135138
136139 /**
@@ -141,30 +144,45 @@ public PromisedValues<V> loadMany(List<K> keys) {
141144 * @return the composite future of the queued load requests
142145 */
143146 public PromisedValues <V > dispatch () {
144- if (!loaderOptions .batchingEnabled () || loaderQueue .size () == 0 ) {
145- return PromisedValues .allOf (Collections .emptyList ());
146- }
147- List <K > keys = new ArrayList <>(loaderQueue .keySet ());
148- PromisedValues <V > batch = batchLoadFunction .load (keys );
149-
150- assertState (keys .size () == batch .size (), "The size of the promised values MUST be the same size as the key list" );
151-
152- dispatchedQueues .put (batch , new LinkedHashMap <>(loaderQueue ));
153- batch .thenAccept (rh -> {
154- AtomicInteger index = new AtomicInteger (0 );
155- dispatchedQueues .get (batch ).forEach ((key , future ) -> {
156- int idx = index .get ();
157- if (batch .succeeded (idx )) {
158- future .complete (batch .get (idx ));
159- } else {
160- future .completeExceptionally (batch .cause (idx ));
147+ synchronized (loaderQueue ) {
148+ if (!loaderOptions .batchingEnabled () || loaderQueue .size () == 0 ) {
149+ return PromisedValues .allOf (Collections .emptyList ());
150+ }
151+ //
152+ // order of keys -> values matter in data loader hence the use of linked hash map
153+ //
154+ // See https://github.com/facebook/dataloader/blob/master/README.md for more details
155+ //
156+ List <K > keys = new ArrayList <>(loaderQueue .keySet ());
157+ List <CompletableFuture <V >> futureList = keys .stream ()
158+ .map (loaderQueue ::get )
159+ .collect (Collectors .toList ());
160+
161+ PromisedValues <V > batchOfPromisedValues = batchLoadFunction .load (keys );
162+
163+ assertState (keys .size () == batchOfPromisedValues .size (), "The size of the promised values MUST be the same size as the key list" );
164+
165+ //
166+ // when the promised list of values completes, we transfer the values into
167+ // the previously cached future objects that client already has been given
168+ // via calls to load("foo") and loadMany("foo")
169+ //
170+ batchOfPromisedValues .thenAccept (promisedValues -> {
171+ for (int idx = 0 ; idx < futureList .size (); idx ++) {
172+ CompletableFuture <V > future = futureList .get (idx );
173+ if (promisedValues .succeeded (idx )) {
174+ V value = promisedValues .get (idx );
175+ future .complete (value );
176+ } else {
177+ Throwable cause = promisedValues .cause (idx );
178+ future .completeExceptionally (cause );
179+ }
161180 }
162- index .incrementAndGet ();
163181 });
164- dispatchedQueues . remove ( batch );
165- } );
166- loaderQueue . clear () ;
167- return batch ;
182+
183+ loaderQueue . clear ( );
184+ return batchOfPromisedValues ;
185+ }
168186 }
169187
170188 /**
@@ -218,7 +236,7 @@ public DataLoader<K, V> prime(K key, V value) {
218236 public DataLoader <K , V > prime (K key , Exception error ) {
219237 Object cacheKey = getCacheKey (key );
220238 if (!futureCache .containsKey (cacheKey )) {
221- futureCache .set (cacheKey , FutureKit .failedFuture (error ));
239+ futureCache .set (cacheKey , CompletableFutureKit .failedFuture (error ));
222240 }
223241 return this ;
224242 }
0 commit comments