diff --git a/src/main/java/org/dataloader/CacheMap.java b/src/main/java/org/dataloader/CacheMap.java
index b3929af..3e6d895 100644
--- a/src/main/java/org/dataloader/CacheMap.java
+++ b/src/main/java/org/dataloader/CacheMap.java
@@ -28,7 +28,13 @@
* CacheMap is used by data loaders that use caching promises to values aka {@link CompletableFuture}<V>. A better name for this
* class might have been FutureCache but that is history now.
*
- * The default implementation used by the data loader is based on a {@link java.util.LinkedHashMap}.
+ * The default implementation used by the data loader is based on a {@link java.util.concurrent.ConcurrentHashMap} because
+ * the data loader code requires the cache to prove atomic writes especially the {@link #putIfAbsentAtomically(Object, CompletableFuture)}
+ * method.
+ *
+ * The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your
+ * own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is
+ * returned always.
*
* This is really a cache of completed {@link CompletableFuture}<V> values in memory. It is used, when caching is enabled, to
* give back the same future to any code that may call it. If you need a cache of the underlying values that is possible external to the JVM
@@ -42,7 +48,7 @@
*/
@PublicSpi
@NullMarked
-public interface CacheMap {
+public interface CacheMap {
/**
* Creates a new cache map, using the default implementation that is based on a {@link java.util.LinkedHashMap}.
@@ -84,14 +90,21 @@ static CacheMap simpleMap() {
Collection> getAll();
/**
- * Creates a new cache map entry with the specified key and value, or updates the value if the key already exists.
+ * Atomically creates a new cache map entry with the specified key and value, or updates the value if the key already exists.
+ *
+ * The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your
+ * own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is
+ * returned always.
+ *
+ * The default implementation of this method uses {@link java.util.concurrent.ConcurrentHashMap} has its implementation so these CAS
+ * writes work as expected.
*
* @param key the key to cache
* @param value the value to cache
*
- * @return the cache map for fluent coding
+ * @return atomically the previous value for the key or null if the value is not present.
*/
- CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value);
+ @Nullable CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value);
/**
* Deletes the entry with the specified key from the cache map, if it exists.
@@ -114,7 +127,7 @@ static CacheMap simpleMap() {
* and intended for testing and debugging.
* If a cache doesn't support it, it can throw an Exception.
*
- * @return
+ * @return the size of the cache
*/
int size();
}
diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java
index 249c1f2..feb6184 100644
--- a/src/main/java/org/dataloader/DataLoaderHelper.java
+++ b/src/main/java/org/dataloader/DataLoaderHelper.java
@@ -49,7 +49,7 @@
@Internal
class DataLoaderHelper {
- static class LoaderQueueEntry {
+ private static class LoaderQueueEntry {
final K key;
final CompletableFuture value;
@@ -155,11 +155,8 @@ CompletableFuture load(K key, Object loadContext) {
try {
CompletableFuture cachedFuture = futureCache.get(cacheKey);
if (cachedFuture != null) {
- // We already have a promise for this key, no need to check value cache or queue up load
- stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
- ctx.onDispatched();
- cachedFuture.whenComplete(ctx::onCompleted);
- return cachedFuture;
+ // We already have a promise for this key, no need to check value cache or queue this load
+ return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
}
} catch (Exception ignored) {
}
@@ -170,11 +167,8 @@ CompletableFuture load(K key, Object loadContext) {
if (futureCachingEnabled) {
CompletableFuture cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture);
if (cachedFuture != null) {
- // another thread was faster and created a matching CF ... hence this is really a cachehit and we are done
- stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
- ctx.onDispatched();
- cachedFuture.whenComplete(ctx::onCompleted);
- return cachedFuture;
+ // another thread was faster and created a matching CF ... hence this is really a cache hit and we are done
+ return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
}
}
addEntryToLoaderQueue(key, loadCallFuture, loadContext);
@@ -186,12 +180,9 @@ CompletableFuture load(K key, Object loadContext) {
CompletableFuture cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture);
if (cachedFuture != null) {
// another thread was faster and the loader was invoked twice with the same key
- // we are disregarding the resul of our dispatch call and use the already cached value
+ // we are disregarding the result of our dispatch call and use the already cached value
// meaning this is a cache hit and we are done
- stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
- ctx.onDispatched();
- cachedFuture.whenComplete(ctx::onCompleted);
- return cachedFuture;
+ return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
}
}
}
@@ -201,6 +192,13 @@ CompletableFuture load(K key, Object loadContext) {
return loadCallFuture;
}
+ private CompletableFuture incrementCacheHitAndReturnCF(DataLoaderInstrumentationContext