From e4ad9679bc3c7223a7b00640e99db5b1e85b658c Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 22 Oct 2025 15:34:04 +0200 Subject: [PATCH 01/11] Prepare issue branch. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0e3024e099..fe25245089 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 4.0.0-SNAPSHOT + 4.0.0-GH-3236-SNAPSHOT Spring Data Redis Spring Data module for Redis From 4d91adaaba8a280bd1a6625b6cd325d8abc078f1 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 22 Oct 2025 15:40:33 +0200 Subject: [PATCH 02/11] Apply non-blocking eviction when using Lettuce for `RedisCache`. clear and evict methods now use asynchronous and non-blocking removal of keys when using the Lettuce Redis driver. RedisCache also supports evictIfPresent to remove cache keys immediately which is a blocking method. --- .../data/redis/cache/BatchStrategies.java | 2 +- .../redis/cache/DefaultRedisCacheWriter.java | 147 ++++++++++++++++-- .../data/redis/cache/RedisCache.java | 17 +- .../data/redis/cache/RedisCacheWriter.java | 35 +++++ .../cache/DefaultRedisCacheWriterTests.java | 54 ++++++- .../redis/cache/LegacyRedisCacheTests.java | 5 + .../data/redis/cache/RedisCacheTests.java | 59 ++++++- 7 files changed, 298 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java b/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java index f84bc697aa..14f3922974 100644 --- a/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java +++ b/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java @@ -100,7 +100,7 @@ public long cleanCache(RedisConnection connection, String name, byte[] pattern) */ static class Scan implements BatchStrategy { - private final int batchSize; + final int batchSize; Scan(int batchSize) { this.batchSize = batchSize; diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index b18991d29d..bf2fd9a980 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -27,9 +27,12 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.jspecify.annotations.Nullable; + import org.springframework.dao.PessimisticLockingFailureException; +import org.springframework.data.redis.connection.ReactiveKeyCommands; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.ReactiveStringCommands; @@ -37,6 +40,7 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.connection.RedisStringCommands.SetOption; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.util.ByteUtils; import org.springframework.util.Assert; @@ -311,8 +315,20 @@ public void remove(String name, byte[] key) { Assert.notNull(name, "Name must not be null"); Assert.notNull(key, "Key must not be null"); - execute(name, connection -> connection.keyCommands().del(key)); + if (supportsAsyncRetrieve()) { + asyncCacheWriter.remove(name, key).thenRun(() -> statistics.incDeletes(name)); + } else { + removeIfPresent(name, key); + } + } + + @Override + public boolean removeIfPresent(String name, byte[] key) { + + Long removals = execute(name, connection -> connection.keyCommands().del(key)); statistics.incDeletes(name); + + return removals > 0; } @Override @@ -321,7 +337,22 @@ public void clean(String name, byte[] pattern) { Assert.notNull(name, "Name must not be null"); Assert.notNull(pattern, "Pattern must not be null"); - execute(name, connection -> { + if (supportsAsyncRetrieve()) { + asyncCacheWriter.clean(name, pattern, batchStrategy) + .thenAccept(deleteCount -> statistics.incDeletesBy(name, deleteCount.intValue())); + return; + } + + invalidate(name, pattern); + } + + @Override + public boolean invalidate(String name, byte[] pattern) { + + Assert.notNull(name, "Name must not be null"); + Assert.notNull(pattern, "Pattern must not be null"); + + return execute(name, connection -> { try { if (isLockingCacheWriter()) { @@ -337,13 +368,12 @@ public void clean(String name, byte[] pattern) { statistics.incDeletesBy(name, (int) deleteCount); + return deleteCount > 0; } finally { if (isLockingCacheWriter()) { doUnlock(name, connection); } } - - return "OK"; }); } @@ -499,6 +529,25 @@ interface AsyncCacheWriter { */ CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl); + /** + * Remove a cache entry asynchronously. + * + * @param name the cache name which to store the cache entry to. + * @param key the key for the cache entry. Must not be {@literal null}. + * @return a future that signals completion. + */ + CompletableFuture remove(String name, byte[] key); + + /** + * Clear the cache asynchronously. + * + * @param name the cache name which to store the cache entry to. + * @param pattern {@link String pattern} used to match Redis keys to clear. + * @param batchStrategy strategy to use. + * @return a future that signals completion emitting the number of removed keys. + */ + CompletableFuture clean(String name, byte[] pattern, BatchStrategy batchStrategy); + } /** @@ -524,6 +573,17 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) { throw new UnsupportedOperationException("async store not supported"); } + + @Override + public CompletableFuture remove(String name, byte[] key) { + throw new UnsupportedOperationException("async remove not supported"); + } + + @Override + public CompletableFuture clean(String name, byte[] pattern, BatchStrategy batchStrategy) { + throw new UnsupportedOperationException("async clean not supported"); + } + } /** @@ -534,6 +594,14 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul */ class AsynchronousCacheWriterDelegate implements AsyncCacheWriter { + private static final int DEFAULT_SCAN_BATCH_SIZE = 64; + private final int cleanBatchSize; + + public AsynchronousCacheWriterDelegate() { + this.cleanBatchSize = batchStrategy instanceof BatchStrategies.Scan scan ? scan.batchSize + : DEFAULT_SCAN_BATCH_SIZE; + } + @Override public boolean isSupported() { return true; @@ -561,20 +629,12 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul return doWithConnection(connection -> { - Mono mono = isLockingCacheWriter() ? doStoreWithLocking(name, key, value, ttl, connection) - : doStore(key, value, ttl, connection); + Mono mono = doWithLocking(name, key, value, connection, () -> doStore(key, value, ttl, connection)); return mono.then().toFuture(); }); } - private Mono doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl, - ReactiveRedisConnection connection) { - - return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection), - unused -> doUnlock(name, connection)); - } - @SuppressWarnings("NullAway") private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl, ReactiveRedisConnection connection) { @@ -590,6 +650,65 @@ private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration } } + @Override + public CompletableFuture remove(String name, byte[] key) { + + return doWithConnection(connection -> { + + Mono mono = doWithLocking(name, key, null, connection, () -> doRemove(key, connection)); + + return mono.then().toFuture(); + }); + } + + @Override + public CompletableFuture clean(String name, byte[] pattern, BatchStrategy batchStrategy) { + + return doWithConnection(connection -> { + + Mono mono = doWithLocking(name, pattern, null, connection, () -> doClean(pattern, connection)); + + return mono.toFuture(); + }); + } + + private Mono doClean(byte[] pattern, ReactiveRedisConnection connection) { + + ReactiveKeyCommands commands = connection.keyCommands(); + + Flux keys; + + if (batchStrategy instanceof BatchStrategies.Keys) { + keys = commands.keys(ByteBuffer.wrap(pattern)).flatMapMany(Flux::fromIterable); + } else { + keys = commands.scan(ScanOptions.scanOptions().count(cleanBatchSize).match(pattern).build()); + } + + return keys + .buffer(cleanBatchSize) // + .flatMap(commands::mUnlink) // + .collect(Collectors.summingLong(Long::longValue)); + } + + @SuppressWarnings("NullAway") + private Mono doRemove(byte[] cacheKey, ReactiveRedisConnection connection) { + + ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey); + + return connection.keyCommands().unlink(wrappedKey); + } + + private Mono doWithLocking(String name, byte[] key, byte @Nullable [] value, + ReactiveRedisConnection connection, Supplier> action) { + + if (isLockingCacheWriter()) { + return Mono.usingWhen(doLock(name, key, value, connection), unused -> action.get(), + unused -> doUnlock(name, connection)); + } + + return action.get(); + } + private Mono doLock(String name, Object contextualKey, @Nullable Object contextualValue, ReactiveRedisConnection connection) { @@ -631,5 +750,7 @@ private CompletableFuture doWithConnection( ReactiveRedisConnection::closeLater) // .toFuture(); } + } + } diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCache.java b/src/main/java/org/springframework/data/redis/cache/RedisCache.java index a2ec676b5a..4ed84af24b 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCache.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCache.java @@ -235,9 +235,12 @@ public void clear() { } /** - * Clear keys that match the given {@link String keyPattern}. + * Clear keys that match the given {@link String keyPattern}. Useful when cache keys are formatted in a style where + * Redis patterns can be used for matching these. *

- * Useful when cache keys are formatted in a style where Redis patterns can be used for matching these. + * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entries. This may for example be the case with transactional cache decorators. Use {@link #invalidate()} + * for guaranteed immediate removal of entries. * * @param keyPattern {@link String pattern} used to match Redis keys to clear. * @since 3.0 @@ -246,6 +249,11 @@ public void clear(String keyPattern) { getCacheWriter().clean(getName(), createAndConvertCacheKey(keyPattern)); } + @Override + public boolean invalidate() { + return getCacheWriter().invalidate(getName(), createAndConvertCacheKey("*")); + } + /** * Reset all statistics counters and gauges for this cache. * @@ -260,6 +268,11 @@ public void evict(Object key) { getCacheWriter().remove(getName(), createAndConvertCacheKey(key)); } + @Override + public boolean evictIfPresent(Object key) { + return getCacheWriter().removeIfPresent(getName(), createAndConvertCacheKey(key)); + } + @Override public CompletableFuture retrieve(Object key) { diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java index ef63003afc..e3c2fdb64c 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java @@ -244,20 +244,54 @@ default CompletableFuture retrieve(String name, byte[] key) { /** * Remove the given key from Redis. + *

+ * Actual eviction may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entry. * * @param name cache name must not be {@literal null}. * @param key key for the cache entry. Must not be {@literal null}. */ void remove(String name, byte[] key); + /** + * Remove the given key from Redis if it is present, expecting the key to be immediately invisible for subsequent + * lookups. + * + * @param name cache name must not be {@literal null}. + * @param key key for the cache entry. Must not be {@literal null}. + * @return {@code true} if the cache was known to have a mapping for this key before, {@code false} if it did not (or + * if prior presence could not be determined). + */ + default boolean removeIfPresent(String name, byte[] key) { + remove(name, key); + return false; + } + /** * Remove all keys following the given pattern. + *

+ * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entries. * * @param name cache name must not be {@literal null}. * @param pattern pattern for the keys to remove. Must not be {@literal null}. */ void clean(String name, byte[] pattern); + /** + * Remove all keys following the given pattern expecting all entries to be immediately invisible for subsequent + * lookups. + * + * @param name cache name must not be {@literal null}. + * @param pattern pattern for the keys to remove. Must not be {@literal null}. + * @return {@code true} if the cache was known to have mappings before, {@code false} if it did not (or if prior + * presence of entries could not be determined). + */ + default boolean invalidate(String name, byte[] pattern) { + clean(name, pattern); + return false; + } + /** * Reset all statistics counters and gauges for this cache. * @@ -323,4 +357,5 @@ static TtlFunction persistent() { Duration getTimeToLive(Object key, @Nullable Object value); } + } diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index 4c4a55a37c..3576b75c5e 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.*; import static org.springframework.data.redis.cache.RedisCacheWriter.*; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -32,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import org.awaitility.Awaitility; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -263,7 +263,7 @@ void getWithValueLoaderShouldStoreCacheValue() { } @Test // DATAREDIS-481, DATAREDIS-1082 - void removeShouldDeleteEntry() { + void removeShouldDeleteEntry() throws InterruptedException { doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); @@ -272,6 +272,27 @@ void removeShouldDeleteEntry() { writer.remove(CACHE_NAME, binaryCacheKey); + if (writer.supportsAsyncRetrieve()) { + doWithConnection(connection -> { + Awaitility.await().until(() -> !connection.exists(binaryCacheKey)); + }); + } + + doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); + } + + @Test // GH-3236 + void removeShouldDeleteEntryIfExists() { + + doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); + + RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) + .withStatisticsCollector(CacheStatisticsCollector.create()); + + assertThat(writer.removeIfPresent(CACHE_NAME, binaryCacheKey)).isTrue(); + doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); @@ -288,7 +309,34 @@ void cleanShouldRemoveAllKeysByPattern() { RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) .withStatisticsCollector(CacheStatisticsCollector.create()); - writer.clean(CACHE_NAME, (CACHE_NAME + "::*").getBytes(Charset.forName("UTF-8"))); + writer.clean(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); + + if (writer.supportsAsyncRetrieve()) { + doWithConnection(connection -> { + Awaitility.await().until(() -> !connection.exists(binaryCacheKey)); + }); + } + + doWithConnection(connection -> { + assertThat(connection.exists(binaryCacheKey)).isFalse(); + assertThat(connection.exists("foo".getBytes())).isTrue(); + }); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); + } + + @Test // GH-3236 + void invalidateShouldRemoveAllKeysByPattern() { + + doWithConnection(connection -> { + connection.set(binaryCacheKey, binaryCacheValue); + connection.set("foo".getBytes(), "bar".getBytes()); + }); + + RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) + .withStatisticsCollector(CacheStatisticsCollector.create()); + + assertThat(writer.invalidate(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8))).isTrue(); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); diff --git a/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java index 2fc0795872..bf3a4f00e3 100644 --- a/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java @@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; @@ -147,6 +148,10 @@ void testCacheClear() { assertThat(cache.get(key2)).isNull(); cache.put(key2, value2); cache.clear(); + + Awaitility.await().until(() -> cache.get(key1) == null); + Awaitility.await().until(() -> cache.get(key2) == null); + assertThat(cache.get(key2)).isNull(); assertThat(cache.get(key1)).isNull(); } diff --git a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java index da46400668..2d6c2fc31b 100644 --- a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java @@ -30,10 +30,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.assertj.core.api.ThrowingConsumer; +import org.awaitility.Awaitility; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,6 +47,7 @@ import org.springframework.cache.support.NullValue; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettuceConnection; import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.test.condition.EnabledOnCommand; @@ -115,6 +117,8 @@ void cacheShouldBeClearedByPattern() { String keyPattern = "*" + key.substring(1); cache.clear(keyPattern); + maybeWaitForNonBlockingClean(); + doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); } @@ -126,6 +130,8 @@ void cacheShouldNotBeClearedIfNoPatternMatch() { String keyPattern = "*" + key.substring(1) + "tail"; cache.clear(keyPattern); + maybeWaitForNonBlockingClean(); + doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isTrue()); } @@ -289,6 +295,25 @@ void evictShouldRemoveKey() { cache.evict(key); + Awaitility.await().until(() -> cache.get(key) == null); + + doWithConnection(connection -> { + assertThat(connection.exists(binaryCacheKey)).isFalse(); + assertThat(connection.exists("other".getBytes())).isTrue(); + }); + } + + @Test // GH-3236 + void evictShouldRemoveKeyIfPresent() { + + doWithConnection(connection -> { + connection.set(binaryCacheKey, binaryNullValue); + connection.set("other".getBytes(), "value".getBytes()); + }); + + assertThat(cache.evictIfPresent(key)).isTrue(); + assertThat(cache.evictIfPresent(key)).isFalse(); + doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); assertThat(connection.exists("other".getBytes())).isTrue(); @@ -305,6 +330,24 @@ void clearShouldClearCache() { cache.clear(); + maybeWaitForNonBlockingClean(); + + doWithConnection(connection -> { + assertThat(connection.exists(binaryCacheKey)).isFalse(); + assertThat(connection.exists("other".getBytes())).isTrue(); + }); + } + + @Test // GH-2028 + void clearShouldInvalidateCache() { + + doWithConnection(connection -> { + connection.set(binaryCacheKey, binaryNullValue); + connection.set("other".getBytes(), "value".getBytes()); + }); + + assertThat(cache.invalidate()).isTrue(); + doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); assertThat(connection.exists("other".getBytes())).isTrue(); @@ -327,6 +370,8 @@ void clearWithScanShouldClearCache() { cache.clear(); + maybeWaitForNonBlockingClean(); + doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); assertThat(connection.exists("cache::foo".getBytes())).isFalse(); @@ -712,12 +757,22 @@ private Function withTtiExpira return entryTtlFunction.andThen(RedisCacheConfiguration::enableTimeToIdle); } - void doWithConnection(Consumer callback) { + void doWithConnection(ThrowingConsumer callback) { try (RedisConnection connection = connectionFactory.getConnection()) { callback.accept(connection); } } + private void maybeWaitForNonBlockingClean() { + doWithConnection(redisConnection -> { + // Cache interface specifies non-blocking behavior, so we've need to wait for Lettuce async to avoid races between + // the process and our assertion here. + if (redisConnection instanceof LettuceConnection) { + Thread.sleep(250); + } + }); + } + static class Person implements Serializable { private String firstname; From 46942de3141988518152db61ef8d76e0a442b1da Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 22 Oct 2025 15:41:21 +0200 Subject: [PATCH 03/11] Polishing. Convert internal types to records. --- .../data/redis/cache/BatchStrategies.java | 19 ++----------------- .../redis/cache/DefaultRedisCacheWriter.java | 2 +- 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java b/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java index 14f3922974..f299f9ecd6 100644 --- a/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java +++ b/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java @@ -98,13 +98,7 @@ public long cleanCache(RedisConnection connection, String name, byte[] pattern) /** * {@link BatchStrategy} using {@code SCAN}. */ - static class Scan implements BatchStrategy { - - final int batchSize; - - Scan(int batchSize) { - this.batchSize = batchSize; - } + record Scan(int batchSize) implements BatchStrategy { @Override public long cleanCache(RedisConnection connection, String name, byte[] pattern) { @@ -138,16 +132,7 @@ public long cleanCache(RedisConnection connection, String name, byte[] pattern) * * @param */ - static class PartitionIterator implements Iterator> { - - private final Iterator iterator; - private final int size; - - PartitionIterator(Iterator iterator, int size) { - - this.iterator = iterator; - this.size = size; - } + record PartitionIterator(Iterator iterator, int size) implements Iterator> { @Override public boolean hasNext() { diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index bf2fd9a980..3ceed612c7 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -598,7 +598,7 @@ class AsynchronousCacheWriterDelegate implements AsyncCacheWriter { private final int cleanBatchSize; public AsynchronousCacheWriterDelegate() { - this.cleanBatchSize = batchStrategy instanceof BatchStrategies.Scan scan ? scan.batchSize + this.cleanBatchSize = batchStrategy instanceof BatchStrategies.Scan scan ? scan.batchSize() : DEFAULT_SCAN_BATCH_SIZE; } From ac70130e4759d631c166707dd3916a71f8048ff0 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 24 Oct 2025 12:18:48 +0200 Subject: [PATCH 04/11] Use non-blocking puts. --- .../redis/cache/DefaultRedisCacheWriter.java | 31 ++++---- .../data/redis/cache/RedisCache.java | 2 +- .../cache/DefaultRedisCacheWriterTests.java | 25 +++++-- .../redis/cache/LegacyRedisCacheTests.java | 47 ++++++++---- .../data/redis/cache/RedisCacheTests.java | 75 +++++++++++-------- 5 files changed, 108 insertions(+), 72 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 3ceed612c7..2a2dce7f9f 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -238,11 +238,14 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) { Assert.notNull(key, "Key must not be null"); Assert.notNull(value, "Value must not be null"); - execute(name, connection -> { - doPut(connection, name, key, value, ttl); - return "OK"; - }); - + if (supportsAsyncRetrieve()) { + asyncCacheWriter.store(name, key, value, ttl).thenRun(() -> statistics.incPuts(name)); + } else { + execute(name, connection -> { + doPut(connection, name, key, value, ttl); + return "OK"; + }); + } } @SuppressWarnings("NullAway") @@ -620,7 +623,7 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur Mono get = shouldExpireWithin(ttl) ? stringCommands.getEx(wrappedKey, Expiration.from(ttl)) : stringCommands.get(wrappedKey); - return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture(); + return cacheLockCheck.then(get).map(ByteUtils::getBytes); }); } @@ -631,7 +634,7 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul Mono mono = doWithLocking(name, key, value, connection, () -> doStore(key, value, ttl, connection)); - return mono.then().toFuture(); + return mono.then(); }); } @@ -654,10 +657,7 @@ private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration public CompletableFuture remove(String name, byte[] key) { return doWithConnection(connection -> { - - Mono mono = doWithLocking(name, key, null, connection, () -> doRemove(key, connection)); - - return mono.then().toFuture(); + return doWithLocking(name, key, null, connection, () -> doRemove(key, connection)).then(); }); } @@ -665,10 +665,7 @@ public CompletableFuture remove(String name, byte[] key) { public CompletableFuture clean(String name, byte[] pattern, BatchStrategy batchStrategy) { return doWithConnection(connection -> { - - Mono mono = doWithLocking(name, pattern, null, connection, () -> doClean(pattern, connection)); - - return mono.toFuture(); + return doWithLocking(name, pattern, null, connection, () -> doClean(pattern, connection)); }); } @@ -741,12 +738,12 @@ private Mono waitForLock(ReactiveRedisConnection connection, String cacheN } private CompletableFuture doWithConnection( - Function> callback) { + Function> callback) { ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory; return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), // - it -> Mono.fromCompletionStage(callback.apply(it)), // + callback::apply, // ReactiveRedisConnection::closeLater) // .toFuture(); } diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCache.java b/src/main/java/org/springframework/data/redis/cache/RedisCache.java index 4ed84af24b..0f796a91f2 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCache.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCache.java @@ -493,7 +493,7 @@ private String convertCollectionLikeOrMapKey(Object key, TypeDescriptor source) throw new IllegalArgumentException("Cannot convert cache key [%s] to String".formatted(key)); } - private byte[] createAndConvertCacheKey(Object key) { + byte[] createAndConvertCacheKey(Object key) { return serializeCacheKey(createCacheKey(key)); } diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index 3576b75c5e..d9650c8323 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -32,7 +32,9 @@ import java.util.function.Consumer; import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -60,6 +62,8 @@ public class DefaultRedisCacheWriterTests { private static final String CACHE_NAME = "default-redis-cache-writer-tests"; + static ConditionFactory await = Awaitility.with().pollDelay(Duration.ZERO).pollInSameThread(); + private String key = "key-1"; private String cacheKey = CACHE_NAME + "::" + key; @@ -88,7 +92,7 @@ void putShouldAddEternalEntry() { RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) .withStatisticsCollector(CacheStatisticsCollector.create()); - writer.put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); + writer.putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); @@ -102,7 +106,7 @@ void putShouldAddEternalEntry() { @Test // DATAREDIS-481 void putShouldAddExpiringEntry() { - nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, + nonLockingRedisCacheWriter(connectionFactory).putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ofSeconds(1)); doWithConnection(connection -> { @@ -119,6 +123,9 @@ void putShouldOverwriteExistingEternalEntry() { nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { + + await.until(() -> connection.ttl(binaryCacheKey) == -1); + assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); assertThat(connection.ttl(binaryCacheKey)).isEqualTo(-1); }); @@ -134,6 +141,7 @@ void putShouldOverwriteExistingExpiringEntryAndResetTtl() { Duration.ofSeconds(5)); doWithConnection(connection -> { + await.until(() -> connection.ttl(binaryCacheKey) < 50); assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); assertThat(connection.ttl(binaryCacheKey)).isGreaterThan(3).isLessThan(6); }); @@ -274,7 +282,7 @@ void removeShouldDeleteEntry() throws InterruptedException { if (writer.supportsAsyncRetrieve()) { doWithConnection(connection -> { - Awaitility.await().until(() -> !connection.exists(binaryCacheKey)); + await.until(() -> !connection.exists(binaryCacheKey)); }); } @@ -313,7 +321,7 @@ void cleanShouldRemoveAllKeysByPattern() { if (writer.supportsAsyncRetrieve()) { doWithConnection(connection -> { - Awaitility.await().until(() -> !connection.exists(binaryCacheKey)); + await.until(() -> !connection.exists(binaryCacheKey)); }); } @@ -351,7 +359,8 @@ void nonLockingCacheWriterShouldIgnoreExistingLock() { ((DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory)).lock(CACHE_NAME); - nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); + nonLockingRedisCacheWriter(connectionFactory).putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, + Duration.ZERO); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -367,6 +376,7 @@ void lockingCacheWriterShouldIgnoreExistingLockOnDifferenceCache() { Duration.ZERO); doWithConnection(connection -> { + await.until(() -> connection.exists(binaryCacheKey)); assertThat(connection.exists(binaryCacheKey)).isTrue(); }); } @@ -393,8 +403,6 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { beforeWrite.await(); - Thread.sleep(200); - doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); }); @@ -403,6 +411,7 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { afterWrite.await(); doWithConnection(connection -> { + await.until(() -> connection.exists(binaryCacheKey)); assertThat(connection.exists(binaryCacheKey)).isTrue(); }); @@ -419,6 +428,8 @@ void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() throws Inte DefaultRedisCacheWriter cw = (DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory); cw.lock(CACHE_NAME); + Assumptions.assumeFalse(cw.supportsAsyncRetrieve()); + CountDownLatch beforeWrite = new CountDownLatch(1); CountDownLatch afterWrite = new CountDownLatch(1); AtomicReference exceptionRef = new AtomicReference<>(); diff --git a/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java index bf3a4f00e3..9cacd05e80 100644 --- a/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java @@ -25,11 +25,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; @@ -58,9 +61,10 @@ public class LegacyRedisCacheTests { private static final String CACHE_NAME = "testCache"; - private final boolean allowCacheNullValues; + static ConditionFactory await = Awaitility.with().pollDelay(Duration.ZERO).pollInSameThread(); + private ObjectFactory keyFactory; private ObjectFactory valueFactory; @@ -80,9 +84,10 @@ public LegacyRedisCacheTests(RedisTemplate template, ObjectFactory keyFa public static Collection testParams() { + System.out.println("tp"); Collection params = AbstractOperationsTestParams.testParams(); - Collection target = new ArrayList<>(); + List target = new ArrayList<>(); for (Object[] source : params) { Object[] cacheNullDisabled = Arrays.copyOf(source, source.length + 1); @@ -127,7 +132,7 @@ void testCachePut() { assertThat(value).isNotNull(); assertThat(cache.get(key)).isNull(); - cache.put(key, value); + putNow(key, value); ValueWrapper valueWrapper = cache.get(key); if (valueWrapper != null) { assertThat(valueWrapper.get()).isEqualTo(value); @@ -149,8 +154,7 @@ void testCacheClear() { cache.put(key2, value2); cache.clear(); - Awaitility.await().until(() -> cache.get(key1) == null); - Awaitility.await().until(() -> cache.get(key2) == null); + await.until(() -> cache.get(key1) == null && cache.get(key2) == null); assertThat(cache.get(key2)).isNull(); assertThat(cache.get(key1)).isNull(); @@ -178,7 +182,7 @@ void testConcurrentRead() throws Exception { Thread th = new Thread(() -> { cache.clear(); cache.put(k1, v1); - cache.put(k2, v2); + putNow(k2, v2); failed.set(v1.equals(cache.get(k1))); }, "concurrent-cache-access"); @@ -193,7 +197,7 @@ void testConcurrentRead() throws Exception { final Object value4 = getValue(); cache.put(key3, value3); - cache.put(key4, value4); + putNow(key4, value4); assertThat(cache.get(key1)).isNull(); assertThat(cache.get(key2)).isNull(); @@ -233,7 +237,7 @@ void testCacheGetShouldReturnCachedInstance() { Object key = getKey(); Object value = getValue(); - cache.put(key, value); + putNow(key, value); assertThat(value).isEqualTo(cache.get(key, Object.class)); } @@ -243,7 +247,7 @@ void testCacheGetShouldRetunInstanceOfCorrectType() { Object key = getKey(); Object value = getValue(); - cache.put(key, value); + putNow(key, value); assertThat(cache.get(key, value.getClass())).isInstanceOf(value.getClass()); } @@ -253,7 +257,7 @@ void testCacheGetShouldThrowExceptionOnInvalidType() { Object key = getKey(); Object value = getValue(); - cache.put(key, value); + putNow(key, value); assertThatIllegalStateException().isThrownBy(() -> cache.get(key, Cache.class)); } @@ -263,7 +267,7 @@ void testCacheGetShouldReturnNullIfNoCachedValueFound() { Object key = getKey(); Object value = getValue(); - cache.put(key, value); + putNow(key, value); Object invalidKey = "spring-data-redis".getBytes(); assertThat(cache.get(invalidKey, value.getClass())).isNull(); @@ -305,7 +309,7 @@ void cachePutWithNullShouldErrorAndLeaveExistingKeyUntouched() { Object key = getKey(); Object value = getValue(); - cache.put(key, value); + putNow(key, value); assertThat(cache.get(key).get()).isEqualTo(value); @@ -330,7 +334,7 @@ void cachePutWithNullShouldAddStuffToRedisWhenCachingNullIsEnabled() { Object key = getKey(); Object value = getValue(); - cache.put(key, null); + putNow(key, null); assertThat(cache.get(key, String.class)).isNull(); } @@ -338,7 +342,7 @@ void cachePutWithNullShouldAddStuffToRedisWhenCachingNullIsEnabled() { @Test // DATAREDIS-553 void testCacheGetSynchronizedNullAllowingNull() { - assumeThat(allowCacheNullValues).as("Only suitable when cache does allow null values.").isTrue(); + Assumptions.assumeTrue(allowCacheNullValues, "Only suitable when cache does allow null values."); Object key = getKey(); Object value = cache.get(key, () -> null); @@ -374,13 +378,26 @@ void testCacheGetSynchronizedNullWithStoredNull() { assumeThat(allowCacheNullValues).as("Only suitable when cache does allow null values").isTrue(); Object key = getKey(); - cache.put(key, null); + putNow(key, null); Object cachedValue = cache.get(key, () -> null); assertThat(cachedValue).isNull(); } + private void putNow(Object key, Object value) { + putNow(cache, key, value); + } + + private void putNow(RedisCache cache, Object key, Object value) { + + cache.put(key, value); + + if (cache.getCacheWriter().supportsAsyncRetrieve()) { + await.until(() -> cache.get(key) != null); + } + } + @SuppressWarnings("unused") private static class CacheGetWithValueLoaderIsThreadSafe extends MultithreadedTestCase { diff --git a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java index 2d6c2fc31b..0f3a748389 100644 --- a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java @@ -35,6 +35,7 @@ import org.assertj.core.api.ThrowingConsumer; import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,7 +48,6 @@ import org.springframework.cache.support.NullValue; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.lettuce.LettuceConnection; import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.test.condition.EnabledOnCommand; @@ -69,6 +69,8 @@ @MethodSource("testParams") public class RedisCacheTests { + static ConditionFactory await = Awaitility.with().pollDelay(Duration.ZERO).pollInSameThread(); + private String key = "key-1"; private String cacheKey = "cache::" + key; private byte[] binaryCacheKey = cacheKey.getBytes(StandardCharsets.UTF_8); @@ -104,34 +106,32 @@ void setUp() { @Test // DATAREDIS-481 void putShouldAddEntry() { - cache.put("key-1", sample); - + putNow(key, sample); doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isTrue()); } @Test // GH-2379 void cacheShouldBeClearedByPattern() { - cache.put(key, sample); + putNow(key, sample); String keyPattern = "*" + key.substring(1); cache.clear(keyPattern); - maybeWaitForNonBlockingClean(); - + if (cache.getCacheWriter().supportsAsyncRetrieve()) { + doWithConnection(connection -> await.until(() -> !connection.exists(binaryCacheKey))); + } doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); } @Test // GH-2379 void cacheShouldNotBeClearedIfNoPatternMatch() { - cache.put(key, sample); + putNow(key, sample); String keyPattern = "*" + key.substring(1) + "tail"; cache.clear(keyPattern); - maybeWaitForNonBlockingClean(); - doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isTrue()); } @@ -139,7 +139,7 @@ void cacheShouldNotBeClearedIfNoPatternMatch() { // DATAREDIS-481 void putNullShouldAddEntryForNullValue() { - cache.put("key-1", null); + putNow(key, null); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -150,7 +150,7 @@ void putNullShouldAddEntryForNullValue() { @Test // DATAREDIS-481 void putIfAbsentShouldAddEntryIfNotExists() { - cache.putIfAbsent("key-1", sample); + cache.putIfAbsent(key, sample); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -161,7 +161,7 @@ void putIfAbsentShouldAddEntryIfNotExists() { @Test // DATAREDIS-481 void putIfAbsentWithNullShouldAddNullValueEntryIfNotExists() { - assertThat(cache.putIfAbsent("key-1", null)).isNull(); + assertThat(cache.putIfAbsent(key, null)).isNull(); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -210,7 +210,7 @@ void shouldReadAndWriteSimpleCacheKey() { SimpleKey key = new SimpleKey("param-1", "param-2"); - cache.put(key, sample); + putNow(key, sample); ValueWrapper result = cache.get(key); assertThat(result).isNotNull(); @@ -230,7 +230,7 @@ void shouldAllowComplexKeyWithToStringMethod() { ComplexKey key = new ComplexKey(sample.getFirstname(), sample.getBirthdate()); - cache.put(key, sample); + putNow(key, sample); ValueWrapper result = cache.get(key); @@ -295,7 +295,9 @@ void evictShouldRemoveKey() { cache.evict(key); - Awaitility.await().until(() -> cache.get(key) == null); + if (cache.getCacheWriter().supportsAsyncRetrieve()) { + await.until(() -> cache.get(key) == null); + } doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); @@ -330,7 +332,9 @@ void clearShouldClearCache() { cache.clear(); - maybeWaitForNonBlockingClean(); + if (cache.getCacheWriter().supportsAsyncRetrieve()) { + doWithConnection(connection -> await.until(() -> !connection.exists(binaryCacheKey))); + } doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); @@ -370,9 +374,12 @@ void clearWithScanShouldClearCache() { cache.clear(); - maybeWaitForNonBlockingClean(); - doWithConnection(connection -> { + + if (cache.getCacheWriter().supportsAsyncRetrieve()) { + await.until(() -> !connection.exists(binaryCacheKey)); + } + assertThat(connection.exists(binaryCacheKey)).isFalse(); assertThat(connection.exists("cache::foo".getBytes())).isFalse(); assertThat(connection.exists("other".getBytes())).isTrue(); @@ -413,7 +420,7 @@ void computePrefixCreatesCacheKeyCorrectly() { RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(SerializationPair.fromSerializer(serializer)) .computePrefixWith(cacheName -> "_" + cacheName + "_")); - cacheWithCustomPrefix.put("key-1", sample); + putNow(cacheWithCustomPrefix, key, sample); doWithConnection( connection -> assertThat(connection.stringCommands().get("_cache_key-1".getBytes(StandardCharsets.UTF_8))) @@ -427,7 +434,7 @@ void prefixCacheNameCreatesCacheKeyCorrectly() { RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory), RedisCacheConfiguration.defaultCacheConfig() .serializeValuesWith(SerializationPair.fromSerializer(serializer)).prefixCacheNameWith("redis::")); - cacheWithCustomPrefix.put("key-1", sample); + putNow(cacheWithCustomPrefix, key, sample); doWithConnection(connection -> assertThat( connection.stringCommands().get("redis::cache::key-1".getBytes(StandardCharsets.UTF_8))) @@ -454,7 +461,7 @@ void fetchKeyWithComputedPrefixReturnsExpectedResult() { void cacheShouldAllowListKeyCacheKeysOfSimpleTypes() { Object key = SimpleKeyGenerator.generateKey(Collections.singletonList("my-cache-key-in-a-list")); - cache.put(key, sample); + putNow(key, sample); ValueWrapper target = cache .get(SimpleKeyGenerator.generateKey(Collections.singletonList("my-cache-key-in-a-list"))); @@ -466,7 +473,7 @@ void cacheShouldAllowListKeyCacheKeysOfSimpleTypes() { void cacheShouldAllowArrayKeyCacheKeysOfSimpleTypes() { Object key = SimpleKeyGenerator.generateKey("my-cache-key-in-an-array"); - cache.put(key, sample); + putNow(key, sample); ValueWrapper target = cache.get(SimpleKeyGenerator.generateKey("my-cache-key-in-an-array")); @@ -478,7 +485,7 @@ void cacheShouldAllowListCacheKeysOfComplexTypes() { Object key = SimpleKeyGenerator .generateKey(Collections.singletonList(new ComplexKey(sample.getFirstname(), sample.getBirthdate()))); - cache.put(key, sample); + putNow(key, sample); ValueWrapper target = cache.get(SimpleKeyGenerator .generateKey(Collections.singletonList(new ComplexKey(sample.getFirstname(), sample.getBirthdate())))); @@ -491,7 +498,7 @@ void cacheShouldAllowMapCacheKeys() { Object key = SimpleKeyGenerator .generateKey(Collections.singletonMap("map-key", new ComplexKey(sample.getFirstname(), sample.getBirthdate()))); - cache.put(key, sample); + putNow(key, sample); ValueWrapper target = cache.get(SimpleKeyGenerator.generateKey( Collections.singletonMap("map-key", new ComplexKey(sample.getFirstname(), sample.getBirthdate())))); @@ -763,14 +770,18 @@ void doWithConnection(ThrowingConsumer callback) { } } - private void maybeWaitForNonBlockingClean() { - doWithConnection(redisConnection -> { - // Cache interface specifies non-blocking behavior, so we've need to wait for Lettuce async to avoid races between - // the process and our assertion here. - if (redisConnection instanceof LettuceConnection) { - Thread.sleep(250); - } - }); + private void putNow(Object key, Object value) { + putNow(cache, key, value); + } + + private void putNow(RedisCache cache, Object key, Object value) { + + byte[] cacheKey = cache.createAndConvertCacheKey(key); + cache.put(key, value); + + if (cache.getCacheWriter().supportsAsyncRetrieve()) { + doWithConnection(connection -> await.until(() -> connection.exists(cacheKey))); + } } static class Person implements Serializable { From 3f97c414aba6451461db9d4bee1221e5a9fb52ec Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 24 Oct 2025 15:10:00 +0200 Subject: [PATCH 05/11] Introduce configuration API for RedisCacheWriter. Also, allow configuration of immediate writes to enforce stronger consistency and visibility of cache writes. --- .../redis/cache/DefaultRedisCacheWriter.java | 132 +++++++++++-- .../data/redis/cache/RedisCacheWriter.java | 184 ++++++++++++++++-- .../cache/DefaultRedisCacheWriterTests.java | 142 ++++++++------ .../redis/cache/LegacyRedisCacheTests.java | 43 ++-- .../data/redis/cache/RedisCacheTests.java | 75 +++---- 5 files changed, 408 insertions(+), 168 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 2a2dce7f9f..4a8bd0c25d 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -84,6 +84,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { private final AsyncCacheWriter asyncCacheWriter; + private final boolean asynchronousWrites; + /** * @param connectionFactory must not be {@literal null}. * @param batchStrategy must not be {@literal null}. @@ -99,19 +101,11 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { * @param batchStrategy must not be {@literal null}. */ DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) { - this(connectionFactory, sleepTime, TtlFunction.persistent(), CacheStatisticsCollector.none(), batchStrategy); + this(connectionFactory, sleepTime, TtlFunction.persistent(), CacheStatisticsCollector.none(), batchStrategy, true); } - /** - * @param connectionFactory must not be {@literal null}. - * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO} - * to disable locking. - * @param lockTtl Lock TTL function must not be {@literal null}. - * @param cacheStatisticsCollector must not be {@literal null}. - * @param batchStrategy must not be {@literal null}. - */ DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, TtlFunction lockTtl, - CacheStatisticsCollector cacheStatisticsCollector, BatchStrategy batchStrategy) { + CacheStatisticsCollector cacheStatisticsCollector, BatchStrategy batchStrategy, boolean asynchronousWrites) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); Assert.notNull(sleepTime, "SleepTime must not be null"); @@ -126,12 +120,116 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { this.batchStrategy = batchStrategy; if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) { - asyncCacheWriter = new AsynchronousCacheWriterDelegate(); + this.asyncCacheWriter = new AsynchronousCacheWriterDelegate(); + this.asynchronousWrites = asynchronousWrites; } else { asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE; + this.asynchronousWrites = false; } } + /** + * Create a new {@code DefaultRedisCacheWriter} applying configuration through {@code configurerConsumer}. + * + * @param connectionFactory the connection factory to use. + * @param configurerConsumer configuration consumer. + * @return a new {@code DefaultRedisCacheWriter}. + * @since 4.0 + */ + public static DefaultRedisCacheWriter create(RedisConnectionFactory connectionFactory, + Consumer configurerConsumer) { + + Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null"); + Assert.notNull(configurerConsumer, "RedisCacheWriterConfigurer function must not be null"); + + DefaultRedisCacheWriterConfigurer config = new DefaultRedisCacheWriterConfigurer(); + configurerConsumer.accept(config); + + return new DefaultRedisCacheWriter(connectionFactory, config.lockSleepTime, config.lockTtlFunction, + config.cacheStatisticsCollector, config.batchStrategy, !config.immediateWrites); + } + + static class DefaultRedisCacheWriterConfigurer + implements RedisCacheWriterConfigurer, CacheLockingConfigurer, CacheLockingConfiguration { + + CacheStatisticsCollector cacheStatisticsCollector = CacheStatisticsCollector.none(); + BatchStrategy batchStrategy = BatchStrategies.keys(); + Duration lockSleepTime = Duration.ZERO; + TtlFunction lockTtlFunction = TtlFunction.persistent(); + boolean immediateWrites = false; + + @Override + public RedisCacheWriterConfigurer collectStatistics(CacheStatisticsCollector cacheStatisticsCollector) { + + Assert.notNull(cacheStatisticsCollector, "CacheStatisticsCollector must not be null"); + this.cacheStatisticsCollector = cacheStatisticsCollector; + + return this; + } + + @Override + public RedisCacheWriterConfigurer batchStrategy(BatchStrategy batchStrategy) { + + Assert.notNull(batchStrategy, "BatchStrategy must not be null"); + this.batchStrategy = batchStrategy; + + return this; + } + + @Override + public RedisCacheWriterConfigurer cacheLocking(Consumer configurerConsumer) { + + Assert.notNull(configurerConsumer, "CacheLockingConfigurer function must not be null"); + configurerConsumer.accept(this); + + return this; + } + + @Override + public RedisCacheWriterConfigurer immediateWrites(boolean enableImmediateWrites) { + + this.immediateWrites = enableImmediateWrites; + return this; + } + + @Override + public void disable() { + this.lockSleepTime = Duration.ZERO; + } + + @Override + public void enable(Consumer configurationConsumer) { + + Assert.notNull(configurationConsumer, "CacheLockingConfigurer function must not be null"); + + if (this.lockSleepTime.isZero() || this.lockSleepTime.isNegative()) { + this.lockSleepTime = Duration.ofMillis(50); + } + configurationConsumer.accept(this); + } + + @Override + public CacheLockingConfiguration sleepTime(Duration sleepTime) { + + Assert.notNull(sleepTime, "Lock sleep time must not be null"); + Assert.isTrue(!sleepTime.isZero() && !sleepTime.isNegative(), + "Lock sleep time must not be null zero or negative"); + + this.lockSleepTime = sleepTime; + return this; + } + + @Override + public CacheLockingConfiguration lockTimeout(TtlFunction ttlFunction) { + + Assert.notNull(ttlFunction, "TTL function must not be null"); + + this.lockTtlFunction = ttlFunction; + return this; + } + + } + @Override public byte @Nullable [] get(String name, byte[] key) { return get(name, key, null); @@ -210,6 +308,10 @@ public boolean supportsAsyncRetrieve() { return asyncCacheWriter.isSupported(); } + private boolean writeAsynchronously() { + return supportsAsyncRetrieve() && asynchronousWrites; + } + @Override public CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl) { @@ -238,7 +340,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) { Assert.notNull(key, "Key must not be null"); Assert.notNull(value, "Value must not be null"); - if (supportsAsyncRetrieve()) { + if (writeAsynchronously()) { asyncCacheWriter.store(name, key, value, ttl).thenRun(() -> statistics.incPuts(name)); } else { execute(name, connection -> { @@ -318,7 +420,7 @@ public void remove(String name, byte[] key) { Assert.notNull(name, "Name must not be null"); Assert.notNull(key, "Key must not be null"); - if (supportsAsyncRetrieve()) { + if (writeAsynchronously()) { asyncCacheWriter.remove(name, key).thenRun(() -> statistics.incDeletes(name)); } else { removeIfPresent(name, key); @@ -340,7 +442,7 @@ public void clean(String name, byte[] pattern) { Assert.notNull(name, "Name must not be null"); Assert.notNull(pattern, "Pattern must not be null"); - if (supportsAsyncRetrieve()) { + if (writeAsynchronously()) { asyncCacheWriter.clean(name, pattern, batchStrategy) .thenAccept(deleteCount -> statistics.incDeletesBy(name, deleteCount.intValue())); return; @@ -393,7 +495,7 @@ public void clearStatistics(String name) { @Override public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector) { return new DefaultRedisCacheWriter(connectionFactory, sleepTime, lockTtl, cacheStatisticsCollector, - this.batchStrategy); + this.batchStrategy, this.asynchronousWrites); } /** diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java index e3c2fdb64c..1bf425a382 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.Supplier; import org.jspecify.annotations.Nullable; @@ -41,7 +42,21 @@ public interface RedisCacheWriter extends CacheStatisticsProvider { /** - * Create new {@link RedisCacheWriter} without locking behavior. + * Create new {@link RedisCacheWriter} configure it through {@link RedisCacheWriterConfigurer}. The cache writer + * defaults does not lock the cache by default using {@link BatchStrategies#keys()}. + * + * @param connectionFactory the connection factory to use. + * @param configurerConsumer a configuration function that configures {@link RedisCacheWriterConfigurer}. + * @return new instance of {@link DefaultRedisCacheWriter}. + * @since 4.0 + */ + static RedisCacheWriter create(RedisConnectionFactory connectionFactory, + Consumer configurerConsumer) { + return DefaultRedisCacheWriter.create(connectionFactory, configurerConsumer); + } + + /** + * Create new {@link RedisCacheWriter} without locking behavior using {@link BatchStrategies#keys()}. * * @param connectionFactory must not be {@literal null}. * @return new instance of {@link DefaultRedisCacheWriter}. @@ -60,15 +75,11 @@ static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connec */ static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy) { - - Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); - Assert.notNull(batchStrategy, "BatchStrategy must not be null"); - - return new DefaultRedisCacheWriter(connectionFactory, batchStrategy); + return create(connectionFactory, config -> config.batchStrategy(batchStrategy)); } /** - * Create new {@link RedisCacheWriter} with locking behavior. + * Create new {@link RedisCacheWriter} with locking behavior using {@link BatchStrategies#keys()}. * * @param connectionFactory must not be {@literal null}. * @return new instance of {@link DefaultRedisCacheWriter}. @@ -88,7 +99,8 @@ static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectio static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy) { - return lockingRedisCacheWriter(connectionFactory, Duration.ofMillis(50), TtlFunction.persistent(), batchStrategy); + return create(connectionFactory, + it -> it.batchStrategy(batchStrategy).cacheLocking(CacheLockingConfigurer::enable)); } /** @@ -105,10 +117,8 @@ static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectio static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, TtlFunction lockTtlFunction, BatchStrategy batchStrategy) { - Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); - - return new DefaultRedisCacheWriter(connectionFactory, sleepTime, lockTtlFunction, CacheStatisticsCollector.none(), - batchStrategy); + return create(connectionFactory, it -> it.batchStrategy(batchStrategy) + .enableLocking(locking -> locking.sleepTime(sleepTime).lockTimeout(lockTtlFunction))); } /** @@ -307,6 +317,156 @@ default boolean invalidate(String name, byte[] pattern) { */ RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector); + /** + * Interface that allows for configuring a {@link RedisCacheWriter}. + * + * @author Mark Paluch + * @since 4.0 + */ + interface RedisCacheWriterConfigurer { + + /** + * Configure the {@link CacheStatisticsCollector} to use. This is useful for plugging in and/or customizing + * statistics collection. + */ + default RedisCacheWriterConfigurer collectStatistics() { + return collectStatistics(CacheStatisticsCollector.create()); + } + + /** + * Configure the {@link CacheStatisticsCollector} to use. This is useful for plugging in and/or customizing + * statistics collection. + *

+ * If no statistics collector is specified, no statistics will be recorded. Statistics collection can be + * reconfigured on the built RedisCacheWriter by invoking + * {@code RedisCacheWriter#withStatisticsCollector(CacheStatisticsCollector)}. + * + * @param cacheStatisticsCollector the statistics collector to use. + */ + RedisCacheWriterConfigurer collectStatistics(CacheStatisticsCollector cacheStatisticsCollector); + + /** + * Configure the {@link BatchStrategy} when clearing the cache (i.e. bulk removal of cache keys). + *

+ * If no batch strategy is specified, the RedisCacheWriter uses {@link BatchStrategies#keys()}; + * + * @param batchStrategy the batch strategy to use. + */ + RedisCacheWriterConfigurer batchStrategy(BatchStrategy batchStrategy); + + /** + * Enable cache locking to synchronize cache access across multiple cache instances. + */ + default RedisCacheWriterConfigurer enableLocking() { + return cacheLocking(it -> it.enable(config -> {})); + } + + /** + * Enable cache locking to synchronize cache access across multiple cache instances. + * + * @param configurerConsumer a configuration function that configures {@link CacheLockingConfiguration}. + */ + default RedisCacheWriterConfigurer enableLocking(Consumer configurerConsumer) { + return cacheLocking(it -> it.enable(configurerConsumer)); + } + + /** + * Configure cache locking to synchronize cache access across multiple cache instances. + * + * @param configurerConsumer a configuration function that configures {@link CacheLockingConfigurer}. + */ + RedisCacheWriterConfigurer cacheLocking(Consumer configurerConsumer); + + /** + * Use immediate writes (i.e. write operations such as + * {@link RedisCacheWriter#put(String, byte[], byte[], Duration)} or {@link #clean(String, byte[])}) shall apply + * immediately. + *

+ * Several {@link org.springframework.cache.Cache} operations can be performed asynchronously or deferred and this + * is the default behavior for {@link RedisCacheWriter}. Enable immediate writes in case a particular cache requires + * stronger consistency (i.e. Cache writes must be visible immediately). + *

+ * When using a {@link org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactive Redis + * driver}, immediate writes lead to blocking. + */ + default RedisCacheWriterConfigurer immediateWrites() { + return immediateWrites(true); + } + + /** + * Configure whether to use immediate writes (i.e. write operations such as + * {@link RedisCacheWriter#put(String, byte[], byte[], Duration)} or {@link #clean(String, byte[])}) shall apply + * immediately. + *

+ * Several {@link org.springframework.cache.Cache} operations can be performed asynchronously or deferred and this + * is the default behavior for {@link RedisCacheWriter}. Enable immediate writes in case a particular cache requires + * stronger consistency (i.e. Cache writes must be visible immediately). + *

+ * When using a {@link org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactive Redis + * driver}, immediate writes lead to blocking. + * + * @param enableImmediateWrites whether write operations must be visible immediately. + */ + RedisCacheWriterConfigurer immediateWrites(boolean enableImmediateWrites); + + } + + /** + * Interface that allows for configuring cache locking. + * + * @author Mark Paluch + * @since 4.0 + */ + interface CacheLockingConfigurer { + + /** + * Disable cache locking (default). + */ + void disable(); + + /** + * Enable cache locking with a default sleep time of {@code 50 milliseconds} and persistent lock keys. + */ + default void enable() { + enable(it -> {}); + } + + /** + * Enable cache locking. + */ + void enable(Consumer configurationConsumer); + + } + + /** + * Interface that allows for configuring cache locking options. + * + * @author Mark Paluch + * @since 4.0 + */ + interface CacheLockingConfiguration { + + /** + * Configure the sleep time between cache lock checks. Sleep time is applied to reattempt lock checks if a cache key + * is locked. + * + * @param sleepTime the sleep time, must not be {@literal null} and must be greater {@link Duration#ZERO}. + */ + CacheLockingConfiguration sleepTime(Duration sleepTime); + + /** + * Configure a {@link TtlFunction} to compute the lock timeout. + *

+ * If no TTL function is specified, the RedisCacheWriter persistent lock keys. Persistent lock keys need to be + * removed in case of failures (e.g. Redis crashes before a lock key is removed). Expiring lock keys can become + * subject to GC timing if lock keys expire while a garbage collection halts the JVM. + * + * @param ttlFunction the lock timeout function. + */ + CacheLockingConfiguration lockTimeout(TtlFunction ttlFunction); + + } + /** * Function to compute the time to live from the cache {@code key} and {@code value}. * diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index d9650c8323..ece958863d 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.cache; import static org.assertj.core.api.Assertions.*; +import static org.junit.Assume.*; import static org.springframework.data.redis.cache.RedisCacheWriter.*; import java.nio.charset.StandardCharsets; @@ -32,7 +33,6 @@ import java.util.function.Consumer; import org.awaitility.Awaitility; -import org.awaitility.core.ConditionFactory; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; @@ -62,8 +62,6 @@ public class DefaultRedisCacheWriterTests { private static final String CACHE_NAME = "default-redis-cache-writer-tests"; - static ConditionFactory await = Awaitility.with().pollDelay(Duration.ZERO).pollInSameThread(); - private String key = "key-1"; private String cacheKey = CACHE_NAME + "::" + key; @@ -89,8 +87,8 @@ void setUp() { // DATAREDIS-481, DATAREDIS-1082 void putShouldAddEternalEntry() { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + configurer -> configurer.immediateWrites().collectStatistics()); writer.putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); @@ -106,8 +104,8 @@ void putShouldAddEternalEntry() { @Test // DATAREDIS-481 void putShouldAddExpiringEntry() { - nonLockingRedisCacheWriter(connectionFactory).putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, - Duration.ofSeconds(1)); + RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::immediateWrites).putIfAbsent(CACHE_NAME, + binaryCacheKey, binaryCacheValue, Duration.ofSeconds(1)); doWithConnection(connection -> { assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); @@ -120,12 +118,26 @@ void putShouldOverwriteExistingEternalEntry() { doWithConnection(connection -> connection.set(binaryCacheKey, "foo".getBytes())); - nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); + RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::immediateWrites).put(CACHE_NAME, + binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { + assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); + assertThat(connection.ttl(binaryCacheKey)).isEqualTo(-1); + }); + } + + @Test // GH-3236 + void nonBlockingPutShouldWriteEntry() { + + RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory); + assumeTrue(writer.supportsAsyncRetrieve()); - await.until(() -> connection.ttl(binaryCacheKey) == -1); + writer.put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); + doWithConnection(connection -> { + + Awaitility.await().pollInSameThread().pollDelay(Duration.ZERO).until(() -> connection.exists(binaryCacheKey)); assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); assertThat(connection.ttl(binaryCacheKey)).isEqualTo(-1); }); @@ -137,11 +149,10 @@ void putShouldOverwriteExistingExpiringEntryAndResetTtl() { doWithConnection(connection -> connection.set(binaryCacheKey, "foo".getBytes(), Expiration.from(1, TimeUnit.MINUTES), SetOption.upsert())); - nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, - Duration.ofSeconds(5)); + RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::immediateWrites).put(CACHE_NAME, + binaryCacheKey, binaryCacheValue, Duration.ofSeconds(5)); doWithConnection(connection -> { - await.until(() -> connection.ttl(binaryCacheKey) < 50); assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); assertThat(connection.ttl(binaryCacheKey)).isGreaterThan(3).isLessThan(6); }); @@ -152,8 +163,8 @@ void getShouldReturnValue() { doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + configurer -> configurer.immediateWrites().collectStatistics()); assertThat(writer.get(CACHE_NAME, binaryCacheKey)).isEqualTo(binaryCacheValue); assertThat(writer.getCacheStatistics(CACHE_NAME).getGets()).isOne(); @@ -172,8 +183,8 @@ void cacheHitRetrieveShouldIncrementStatistics() throws ExecutionException, Inte doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + configurer -> configurer.immediateWrites().collectStatistics()); writer.retrieve(CACHE_NAME, binaryCacheKey).get(); @@ -185,8 +196,7 @@ void cacheHitRetrieveShouldIncrementStatistics() throws ExecutionException, Inte @EnabledOnRedisDriver(RedisDriver.LETTUCE) void storeShouldIncrementStatistics() throws ExecutionException, InterruptedException { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); writer.store(CACHE_NAME, binaryCacheKey, binaryCacheValue, null).get(); @@ -197,8 +207,7 @@ void storeShouldIncrementStatistics() throws ExecutionException, InterruptedExce @EnabledOnRedisDriver(RedisDriver.LETTUCE) void cacheMissRetrieveWithLoaderAsyncShouldIncrementStatistics() throws ExecutionException, InterruptedException { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); writer.retrieve(CACHE_NAME, binaryCacheKey).get(); @@ -209,8 +218,7 @@ void cacheMissRetrieveWithLoaderAsyncShouldIncrementStatistics() throws Executio @Test // DATAREDIS-481, DATAREDIS-1082 void putIfAbsentShouldAddEternalEntryWhenKeyDoesNotExist() { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); assertThat(writer.putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO)).isNull(); @@ -242,8 +250,7 @@ void putIfAbsentShouldNotAddEternalEntryWhenKeyAlreadyExist() { @Test // DATAREDIS-481, DATAREDIS-1082 void putIfAbsentShouldAddExpiringEntryWhenKeyDoesNotExist() { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); assertThat(writer.putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ofSeconds(5))).isNull(); @@ -257,8 +264,7 @@ void putIfAbsentShouldAddExpiringEntryWhenKeyDoesNotExist() { @Test // GH-2890 void getWithValueLoaderShouldStoreCacheValue() { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); writer.get(CACHE_NAME, binaryCacheKey, () -> binaryCacheValue, Duration.ofSeconds(5), true); @@ -271,33 +277,42 @@ void getWithValueLoaderShouldStoreCacheValue() { } @Test // DATAREDIS-481, DATAREDIS-1082 - void removeShouldDeleteEntry() throws InterruptedException { + void removeShouldDeleteEntry() { doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + config -> config.collectStatistics().immediateWrites()); writer.remove(CACHE_NAME, binaryCacheKey); - if (writer.supportsAsyncRetrieve()) { - doWithConnection(connection -> { - await.until(() -> !connection.exists(binaryCacheKey)); - }); - } - doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); } + @Test // GH-3236 + void removeShouldNonblockingDeleteEntry() { + + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); + assumeTrue(writer.supportsAsyncRetrieve()); + + writer.remove(CACHE_NAME, binaryCacheKey); + + doWithConnection(connection -> { + Awaitility.await().pollInSameThread().pollDelay(Duration.ZERO).until(() -> !connection.exists(binaryCacheKey)); + assertThat(connection.exists(binaryCacheKey)).isFalse(); + }); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); + } + @Test // GH-3236 void removeShouldDeleteEntryIfExists() { doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); assertThat(writer.removeIfPresent(CACHE_NAME, binaryCacheKey)).isTrue(); @@ -314,18 +329,34 @@ void cleanShouldRemoveAllKeysByPattern() { connection.set("foo".getBytes(), "bar".getBytes()); }); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + config -> config.collectStatistics().immediateWrites()); writer.clean(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); - if (writer.supportsAsyncRetrieve()) { - doWithConnection(connection -> { - await.until(() -> !connection.exists(binaryCacheKey)); - }); - } + doWithConnection(connection -> { + assertThat(connection.exists(binaryCacheKey)).isFalse(); + assertThat(connection.exists("foo".getBytes())).isTrue(); + }); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); + } + + @Test // GH-3236 + void nonBlockingCleanShouldRemoveAllKeysByPattern() { + + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); + assumeTrue(writer.supportsAsyncRetrieve()); + + doWithConnection(connection -> { + connection.set(binaryCacheKey, binaryCacheValue); + connection.set("foo".getBytes(), "bar".getBytes()); + }); + + writer.clean(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); doWithConnection(connection -> { + Awaitility.await().pollInSameThread().pollDelay(Duration.ZERO).until(() -> !connection.exists(binaryCacheKey)); assertThat(connection.exists(binaryCacheKey)).isFalse(); assertThat(connection.exists("foo".getBytes())).isTrue(); }); @@ -341,8 +372,7 @@ void invalidateShouldRemoveAllKeysByPattern() { connection.set("foo".getBytes(), "bar".getBytes()); }); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, config -> config.collectStatistics()); assertThat(writer.invalidate(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8))).isTrue(); @@ -359,8 +389,8 @@ void nonLockingCacheWriterShouldIgnoreExistingLock() { ((DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory)).lock(CACHE_NAME); - nonLockingRedisCacheWriter(connectionFactory).putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, - Duration.ZERO); + RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::immediateWrites).putIfAbsent(CACHE_NAME, + binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -372,11 +402,10 @@ void lockingCacheWriterShouldIgnoreExistingLockOnDifferenceCache() { ((DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory)).lock(CACHE_NAME); - lockingRedisCacheWriter(connectionFactory).put(CACHE_NAME + "-no-the-other-cache", binaryCacheKey, binaryCacheValue, - Duration.ZERO); + RedisCacheWriter.create(connectionFactory, it -> it.immediateWrites().enableLocking()) + .put(CACHE_NAME + "-no-the-other-cache", binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { - await.until(() -> connection.exists(binaryCacheKey)); assertThat(connection.exists(binaryCacheKey)).isTrue(); }); } @@ -384,8 +413,8 @@ void lockingCacheWriterShouldIgnoreExistingLockOnDifferenceCache() { @Test // DATAREDIS-481, DATAREDIS-1082 void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { - DefaultRedisCacheWriter writer = (DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + DefaultRedisCacheWriter writer = DefaultRedisCacheWriter.create(connectionFactory, + it -> it.enableLocking().immediateWrites().collectStatistics()); writer.lock(CACHE_NAME); CountDownLatch beforeWrite = new CountDownLatch(1); @@ -411,12 +440,10 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { afterWrite.await(); doWithConnection(connection -> { - await.until(() -> connection.exists(binaryCacheKey)); assertThat(connection.exists(binaryCacheKey)).isTrue(); }); assertThat(writer.getCacheStatistics(CACHE_NAME).getLockWaitDuration(TimeUnit.NANOSECONDS)).isGreaterThan(0); - } finally { th.interrupt(); } @@ -425,7 +452,8 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { @Test // DATAREDIS-481 void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() throws InterruptedException { - DefaultRedisCacheWriter cw = (DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory); + DefaultRedisCacheWriter cw = DefaultRedisCacheWriter.create(connectionFactory, + it -> it.enableLocking().immediateWrites()); cw.lock(CACHE_NAME); Assumptions.assumeFalse(cw.supportsAsyncRetrieve()); @@ -437,7 +465,7 @@ void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() throws Inte Thread th = new Thread(() -> { DefaultRedisCacheWriter writer = new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50), - BatchStrategies.keys()) { + TtlFunction.persistent(), CacheStatisticsCollector.none(), BatchStrategies.keys(), false) { @Override boolean doCheckLock(String name, RedisConnection connection) { diff --git a/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java index 9cacd05e80..26f04c51ed 100644 --- a/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java @@ -30,8 +30,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import org.awaitility.Awaitility; -import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -63,8 +61,6 @@ public class LegacyRedisCacheTests { private static final String CACHE_NAME = "testCache"; private final boolean allowCacheNullValues; - static ConditionFactory await = Awaitility.with().pollDelay(Duration.ZERO).pollInSameThread(); - private ObjectFactory keyFactory; private ObjectFactory valueFactory; @@ -84,7 +80,6 @@ public LegacyRedisCacheTests(RedisTemplate template, ObjectFactory keyFa public static Collection testParams() { - System.out.println("tp"); Collection params = AbstractOperationsTestParams.testParams(); List target = new ArrayList<>(); @@ -112,7 +107,8 @@ private RedisCache createCache() { cacheConfiguration = cacheConfiguration.disableCachingNullValues(); } - return new RedisCache(CACHE_NAME, RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory), + return new RedisCache(CACHE_NAME, + RedisCacheWriter.create(connectionFactory, RedisCacheWriter.RedisCacheWriterConfigurer::immediateWrites), cacheConfiguration); } @@ -132,7 +128,7 @@ void testCachePut() { assertThat(value).isNotNull(); assertThat(cache.get(key)).isNull(); - putNow(key, value); + cache.put(key, value); ValueWrapper valueWrapper = cache.get(key); if (valueWrapper != null) { assertThat(valueWrapper.get()).isEqualTo(value); @@ -154,8 +150,6 @@ void testCacheClear() { cache.put(key2, value2); cache.clear(); - await.until(() -> cache.get(key1) == null && cache.get(key2) == null); - assertThat(cache.get(key2)).isNull(); assertThat(cache.get(key1)).isNull(); } @@ -182,7 +176,7 @@ void testConcurrentRead() throws Exception { Thread th = new Thread(() -> { cache.clear(); cache.put(k1, v1); - putNow(k2, v2); + cache.put(k2, v2); failed.set(v1.equals(cache.get(k1))); }, "concurrent-cache-access"); @@ -197,7 +191,7 @@ void testConcurrentRead() throws Exception { final Object value4 = getValue(); cache.put(key3, value3); - putNow(key4, value4); + cache.put(key4, value4); assertThat(cache.get(key1)).isNull(); assertThat(cache.get(key2)).isNull(); @@ -237,7 +231,7 @@ void testCacheGetShouldReturnCachedInstance() { Object key = getKey(); Object value = getValue(); - putNow(key, value); + cache.put(key, value); assertThat(value).isEqualTo(cache.get(key, Object.class)); } @@ -247,7 +241,7 @@ void testCacheGetShouldRetunInstanceOfCorrectType() { Object key = getKey(); Object value = getValue(); - putNow(key, value); + cache.put(key, value); assertThat(cache.get(key, value.getClass())).isInstanceOf(value.getClass()); } @@ -257,7 +251,7 @@ void testCacheGetShouldThrowExceptionOnInvalidType() { Object key = getKey(); Object value = getValue(); - putNow(key, value); + cache.put(key, value); assertThatIllegalStateException().isThrownBy(() -> cache.get(key, Cache.class)); } @@ -267,7 +261,7 @@ void testCacheGetShouldReturnNullIfNoCachedValueFound() { Object key = getKey(); Object value = getValue(); - putNow(key, value); + cache.put(key, value); Object invalidKey = "spring-data-redis".getBytes(); assertThat(cache.get(invalidKey, value.getClass())).isNull(); @@ -309,7 +303,7 @@ void cachePutWithNullShouldErrorAndLeaveExistingKeyUntouched() { Object key = getKey(); Object value = getValue(); - putNow(key, value); + cache.put(key, value); assertThat(cache.get(key).get()).isEqualTo(value); @@ -334,7 +328,7 @@ void cachePutWithNullShouldAddStuffToRedisWhenCachingNullIsEnabled() { Object key = getKey(); Object value = getValue(); - putNow(key, null); + cache.put(key, null); assertThat(cache.get(key, String.class)).isNull(); } @@ -378,26 +372,13 @@ void testCacheGetSynchronizedNullWithStoredNull() { assumeThat(allowCacheNullValues).as("Only suitable when cache does allow null values").isTrue(); Object key = getKey(); - putNow(key, null); + cache.put(key, null); Object cachedValue = cache.get(key, () -> null); assertThat(cachedValue).isNull(); } - private void putNow(Object key, Object value) { - putNow(cache, key, value); - } - - private void putNow(RedisCache cache, Object key, Object value) { - - cache.put(key, value); - - if (cache.getCacheWriter().supportsAsyncRetrieve()) { - await.until(() -> cache.get(key) != null); - } - } - @SuppressWarnings("unused") private static class CacheGetWithValueLoaderIsThreadSafe extends MultithreadedTestCase { diff --git a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java index 0f3a748389..c0e2251fcf 100644 --- a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java @@ -34,8 +34,6 @@ import java.util.function.Supplier; import org.assertj.core.api.ThrowingConsumer; -import org.awaitility.Awaitility; -import org.awaitility.core.ConditionFactory; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -69,8 +67,6 @@ @MethodSource("testParams") public class RedisCacheTests { - static ConditionFactory await = Awaitility.with().pollDelay(Duration.ZERO).pollInSameThread(); - private String key = "key-1"; private String cacheKey = "cache::" + key; private byte[] binaryCacheKey = cacheKey.getBytes(StandardCharsets.UTF_8); @@ -106,28 +102,25 @@ void setUp() { @Test // DATAREDIS-481 void putShouldAddEntry() { - putNow(key, sample); + cache.put(key, sample); doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isTrue()); } @Test // GH-2379 void cacheShouldBeClearedByPattern() { - putNow(key, sample); + cache.put(key, sample); String keyPattern = "*" + key.substring(1); cache.clear(keyPattern); - if (cache.getCacheWriter().supportsAsyncRetrieve()) { - doWithConnection(connection -> await.until(() -> !connection.exists(binaryCacheKey))); - } doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); } @Test // GH-2379 void cacheShouldNotBeClearedIfNoPatternMatch() { - putNow(key, sample); + cache.put(key, sample); String keyPattern = "*" + key.substring(1) + "tail"; cache.clear(keyPattern); @@ -139,7 +132,7 @@ void cacheShouldNotBeClearedIfNoPatternMatch() { // DATAREDIS-481 void putNullShouldAddEntryForNullValue() { - putNow(key, null); + cache.put(key, null); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -210,7 +203,7 @@ void shouldReadAndWriteSimpleCacheKey() { SimpleKey key = new SimpleKey("param-1", "param-2"); - putNow(key, sample); + cache.put(key, sample); ValueWrapper result = cache.get(key); assertThat(result).isNotNull(); @@ -230,7 +223,7 @@ void shouldAllowComplexKeyWithToStringMethod() { ComplexKey key = new ComplexKey(sample.getFirstname(), sample.getBirthdate()); - putNow(key, sample); + cache.put(key, sample); ValueWrapper result = cache.get(key); @@ -295,10 +288,6 @@ void evictShouldRemoveKey() { cache.evict(key); - if (cache.getCacheWriter().supportsAsyncRetrieve()) { - await.until(() -> cache.get(key) == null); - } - doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); assertThat(connection.exists("other".getBytes())).isTrue(); @@ -332,10 +321,6 @@ void clearShouldClearCache() { cache.clear(); - if (cache.getCacheWriter().supportsAsyncRetrieve()) { - doWithConnection(connection -> await.until(() -> !connection.exists(binaryCacheKey))); - } - doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); assertThat(connection.exists("other".getBytes())).isTrue(); @@ -363,7 +348,7 @@ void clearShouldInvalidateCache() { void clearWithScanShouldClearCache() { RedisCache cache = new RedisCache("cache", - RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.scan(25)), + RedisCacheWriter.create(connectionFactory, it -> it.immediateWrites().batchStrategy(BatchStrategies.scan(25))), RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(SerializationPair.fromSerializer(serializer))); doWithConnection(connection -> { @@ -375,11 +360,6 @@ void clearWithScanShouldClearCache() { cache.clear(); doWithConnection(connection -> { - - if (cache.getCacheWriter().supportsAsyncRetrieve()) { - await.until(() -> !connection.exists(binaryCacheKey)); - } - assertThat(connection.exists(binaryCacheKey)).isFalse(); assertThat(connection.exists("cache::foo".getBytes())).isFalse(); assertThat(connection.exists("other".getBytes())).isTrue(); @@ -416,11 +396,11 @@ void getWithCallableShouldNotResolveValueIfPresent() { void computePrefixCreatesCacheKeyCorrectly() { RedisCache cacheWithCustomPrefix = new RedisCache("cache", - RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory), + RedisCacheWriter.create(connectionFactory, RedisCacheWriter.RedisCacheWriterConfigurer::immediateWrites), RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(SerializationPair.fromSerializer(serializer)) .computePrefixWith(cacheName -> "_" + cacheName + "_")); - putNow(cacheWithCustomPrefix, key, sample); + cacheWithCustomPrefix.put(key, sample); doWithConnection( connection -> assertThat(connection.stringCommands().get("_cache_key-1".getBytes(StandardCharsets.UTF_8))) @@ -431,10 +411,11 @@ void computePrefixCreatesCacheKeyCorrectly() { void prefixCacheNameCreatesCacheKeyCorrectly() { RedisCache cacheWithCustomPrefix = new RedisCache("cache", - RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory), RedisCacheConfiguration.defaultCacheConfig() + RedisCacheWriter.create(connectionFactory, RedisCacheWriter.RedisCacheWriterConfigurer::immediateWrites), + RedisCacheConfiguration.defaultCacheConfig() .serializeValuesWith(SerializationPair.fromSerializer(serializer)).prefixCacheNameWith("redis::")); - putNow(cacheWithCustomPrefix, key, sample); + cacheWithCustomPrefix.put(key, sample); doWithConnection(connection -> assertThat( connection.stringCommands().get("redis::cache::key-1".getBytes(StandardCharsets.UTF_8))) @@ -461,7 +442,7 @@ void fetchKeyWithComputedPrefixReturnsExpectedResult() { void cacheShouldAllowListKeyCacheKeysOfSimpleTypes() { Object key = SimpleKeyGenerator.generateKey(Collections.singletonList("my-cache-key-in-a-list")); - putNow(key, sample); + cache.put(key, sample); ValueWrapper target = cache .get(SimpleKeyGenerator.generateKey(Collections.singletonList("my-cache-key-in-a-list"))); @@ -473,7 +454,7 @@ void cacheShouldAllowListKeyCacheKeysOfSimpleTypes() { void cacheShouldAllowArrayKeyCacheKeysOfSimpleTypes() { Object key = SimpleKeyGenerator.generateKey("my-cache-key-in-an-array"); - putNow(key, sample); + cache.put(key, sample); ValueWrapper target = cache.get(SimpleKeyGenerator.generateKey("my-cache-key-in-an-array")); @@ -485,7 +466,7 @@ void cacheShouldAllowListCacheKeysOfComplexTypes() { Object key = SimpleKeyGenerator .generateKey(Collections.singletonList(new ComplexKey(sample.getFirstname(), sample.getBirthdate()))); - putNow(key, sample); + cache.put(key, sample); ValueWrapper target = cache.get(SimpleKeyGenerator .generateKey(Collections.singletonList(new ComplexKey(sample.getFirstname(), sample.getBirthdate())))); @@ -498,7 +479,7 @@ void cacheShouldAllowMapCacheKeys() { Object key = SimpleKeyGenerator .generateKey(Collections.singletonMap("map-key", new ComplexKey(sample.getFirstname(), sample.getBirthdate()))); - putNow(key, sample); + cache.put(key, sample); ValueWrapper target = cache.get(SimpleKeyGenerator.generateKey( Collections.singletonMap("map-key", new ComplexKey(sample.getFirstname(), sample.getBirthdate())))); @@ -740,16 +721,18 @@ private RedisCacheWriter usingRedisCacheWriter() { } private RedisCacheWriter usingLockingRedisCacheWriter() { - return RedisCacheWriter.lockingRedisCacheWriter(this.connectionFactory); + return usingLockingRedisCacheWriter(Duration.ofMillis(50L)); } private RedisCacheWriter usingLockingRedisCacheWriter(Duration sleepTime) { - return RedisCacheWriter.lockingRedisCacheWriter(this.connectionFactory, sleepTime, - RedisCacheWriter.TtlFunction.persistent(), BatchStrategies.keys()); + return RedisCacheWriter.create(this.connectionFactory, config -> { + config.immediateWrites().enableLocking(it -> it.sleepTime(sleepTime)).batchStrategy(BatchStrategies.keys()); + }); } private RedisCacheWriter usingNonLockingRedisCacheWriter() { - return RedisCacheWriter.nonLockingRedisCacheWriter(this.connectionFactory); + return RedisCacheWriter.create(this.connectionFactory, + config -> config.immediateWrites().batchStrategy(BatchStrategies.keys())); } private @Nullable Object unwrap(@Nullable Object value) { @@ -770,20 +753,6 @@ void doWithConnection(ThrowingConsumer callback) { } } - private void putNow(Object key, Object value) { - putNow(cache, key, value); - } - - private void putNow(RedisCache cache, Object key, Object value) { - - byte[] cacheKey = cache.createAndConvertCacheKey(key); - cache.put(key, value); - - if (cache.getCacheWriter().supportsAsyncRetrieve()) { - doWithConnection(connection -> await.until(() -> connection.exists(cacheKey))); - } - } - static class Person implements Serializable { private String firstname; From 1baa7e14d3594b152322c5c9dafee9605d090c55 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 27 Oct 2025 13:04:27 +0100 Subject: [PATCH 06/11] Typo. --- .../springframework/data/redis/cache/RedisCacheWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java index 1bf425a382..acc44dd61f 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java @@ -493,9 +493,9 @@ static TtlFunction just(Duration duration) { } /** - * Returns a {@link TtlFunction} to create persistent entires that do not expire. + * Returns a {@link TtlFunction} to create persistent entries that do not expire. * - * @return a {@link TtlFunction} to create persistent entires that do not expire. + * @return a {@link TtlFunction} to create persistent entries that do not expire. */ static TtlFunction persistent() { return just(NO_EXPIRATION); From cfbd8e8370244eb1daa8383d50ae2fa5263d8bf4 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 27 Oct 2025 13:05:10 +0100 Subject: [PATCH 07/11] Format. --- .../data/redis/cache/DefaultRedisCacheWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 4a8bd0c25d..4974c4d6b9 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -216,6 +216,7 @@ public CacheLockingConfiguration sleepTime(Duration sleepTime) { "Lock sleep time must not be null zero or negative"); this.lockSleepTime = sleepTime; + return this; } @@ -223,8 +224,8 @@ public CacheLockingConfiguration sleepTime(Duration sleepTime) { public CacheLockingConfiguration lockTimeout(TtlFunction ttlFunction) { Assert.notNull(ttlFunction, "TTL function must not be null"); - this.lockTtlFunction = ttlFunction; + return this; } From 74e90b962802f58b11bc12ed2a270d39d56724b4 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Mon, 27 Oct 2025 10:28:16 +0100 Subject: [PATCH 08/11] Undo change to record leaking underlying Iterator --- .../data/redis/cache/BatchStrategies.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java b/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java index f299f9ecd6..f90af59445 100644 --- a/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java +++ b/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java @@ -132,7 +132,16 @@ public long cleanCache(RedisConnection connection, String name, byte[] pattern) * * @param */ - record PartitionIterator(Iterator iterator, int size) implements Iterator> { + static class PartitionIterator implements Iterator> { + + private final Iterator iterator; + private final int size; + + PartitionIterator(Iterator iterator, int size) { + + this.iterator = iterator; + this.size = size; + } @Override public boolean hasNext() { From 5740262d9d54d623c8321779b14b02de5c2fe654 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Mon, 27 Oct 2025 10:29:50 +0100 Subject: [PATCH 09/11] Rename clean to clear which aligns with what the method does. --- .../redis/cache/DefaultRedisCacheWriter.java | 23 ++++++----- .../data/redis/cache/RedisCacheWriter.java | 24 +++++++++-- .../cache/DefaultRedisCacheWriterTests.java | 41 +++++++++++++++++-- 3 files changed, 69 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 4974c4d6b9..4957804362 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -438,13 +438,13 @@ public boolean removeIfPresent(String name, byte[] key) { } @Override - public void clean(String name, byte[] pattern) { + public void clear(String name, byte[] pattern) { Assert.notNull(name, "Name must not be null"); Assert.notNull(pattern, "Pattern must not be null"); if (writeAsynchronously()) { - asyncCacheWriter.clean(name, pattern, batchStrategy) + asyncCacheWriter.clear(name, pattern, batchStrategy) .thenAccept(deleteCount -> statistics.incDeletesBy(name, deleteCount.intValue())); return; } @@ -651,8 +651,9 @@ interface AsyncCacheWriter { * @param pattern {@link String pattern} used to match Redis keys to clear. * @param batchStrategy strategy to use. * @return a future that signals completion emitting the number of removed keys. + * @since 4.0 */ - CompletableFuture clean(String name, byte[] pattern, BatchStrategy batchStrategy); + CompletableFuture clear(String name, byte[] pattern, BatchStrategy batchStrategy); } @@ -686,7 +687,7 @@ public CompletableFuture remove(String name, byte[] key) { } @Override - public CompletableFuture clean(String name, byte[] pattern, BatchStrategy batchStrategy) { + public CompletableFuture clear(String name, byte[] pattern, BatchStrategy batchStrategy) { throw new UnsupportedOperationException("async clean not supported"); } @@ -701,10 +702,10 @@ public CompletableFuture clean(String name, byte[] pattern, BatchStrategy class AsynchronousCacheWriterDelegate implements AsyncCacheWriter { private static final int DEFAULT_SCAN_BATCH_SIZE = 64; - private final int cleanBatchSize; + private final int clearBatchSize; public AsynchronousCacheWriterDelegate() { - this.cleanBatchSize = batchStrategy instanceof BatchStrategies.Scan scan ? scan.batchSize() + this.clearBatchSize = batchStrategy instanceof BatchStrategies.Scan scan ? scan.batchSize() : DEFAULT_SCAN_BATCH_SIZE; } @@ -765,14 +766,14 @@ public CompletableFuture remove(String name, byte[] key) { } @Override - public CompletableFuture clean(String name, byte[] pattern, BatchStrategy batchStrategy) { + public CompletableFuture clear(String name, byte[] pattern, BatchStrategy batchStrategy) { return doWithConnection(connection -> { - return doWithLocking(name, pattern, null, connection, () -> doClean(pattern, connection)); + return doWithLocking(name, pattern, null, connection, () -> doClear(pattern, connection)); }); } - private Mono doClean(byte[] pattern, ReactiveRedisConnection connection) { + private Mono doClear(byte[] pattern, ReactiveRedisConnection connection) { ReactiveKeyCommands commands = connection.keyCommands(); @@ -781,11 +782,11 @@ private Mono doClean(byte[] pattern, ReactiveRedisConnection connection) { if (batchStrategy instanceof BatchStrategies.Keys) { keys = commands.keys(ByteBuffer.wrap(pattern)).flatMapMany(Flux::fromIterable); } else { - keys = commands.scan(ScanOptions.scanOptions().count(cleanBatchSize).match(pattern).build()); + keys = commands.scan(ScanOptions.scanOptions().count(clearBatchSize).match(pattern).build()); } return keys - .buffer(cleanBatchSize) // + .buffer(clearBatchSize) // .flatMap(commands::mUnlink) // .collect(Collectors.summingLong(Long::longValue)); } diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java index acc44dd61f..ed0e569734 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java @@ -285,8 +285,24 @@ default boolean removeIfPresent(String name, byte[] key) { * * @param name cache name must not be {@literal null}. * @param pattern pattern for the keys to remove. Must not be {@literal null}. + * @deprecated since 4.0 in favor of {@link #clear(String, byte[])} */ - void clean(String name, byte[] pattern); + @Deprecated(since = "4.0", forRemoval = true) + default void clean(String name, byte[] pattern) { + clear(name, pattern); + } + + /** + * Remove all keys following the given pattern. + *

+ * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entries. + * + * @param name cache name must not be {@literal null}. + * @param pattern pattern for the keys to remove. Must not be {@literal null}. + * @since 4.0 + */ + void clear(String name, byte[] pattern); /** * Remove all keys following the given pattern expecting all entries to be immediately invisible for subsequent @@ -298,7 +314,7 @@ default boolean removeIfPresent(String name, byte[] key) { * presence of entries could not be determined). */ default boolean invalidate(String name, byte[] pattern) { - clean(name, pattern); + clear(name, pattern); return false; } @@ -379,7 +395,7 @@ default RedisCacheWriterConfigurer enableLocking(Consumer * Several {@link org.springframework.cache.Cache} operations can be performed asynchronously or deferred and this @@ -395,7 +411,7 @@ default RedisCacheWriterConfigurer immediateWrites() { /** * Configure whether to use immediate writes (i.e. write operations such as - * {@link RedisCacheWriter#put(String, byte[], byte[], Duration)} or {@link #clean(String, byte[])}) shall apply + * {@link RedisCacheWriter#put(String, byte[], byte[], Duration)} or {@link #clear(String, byte[])}) shall apply * immediately. *

* Several {@link org.springframework.cache.Cache} operations can be performed asynchronously or deferred and this diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index ece958863d..1bc52446d2 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -15,9 +15,12 @@ */ package org.springframework.data.redis.cache; -import static org.assertj.core.api.Assertions.*; -import static org.junit.Assume.*; -import static org.springframework.data.redis.cache.RedisCacheWriter.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assume.assumeTrue; +import static org.springframework.data.redis.cache.RedisCacheWriter.RedisCacheWriterConfigurer; +import static org.springframework.data.redis.cache.RedisCacheWriter.TtlFunction; +import static org.springframework.data.redis.cache.RedisCacheWriter.lockingRedisCacheWriter; +import static org.springframework.data.redis.cache.RedisCacheWriter.nonLockingRedisCacheWriter; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -31,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; import org.awaitility.Awaitility; import org.jspecify.annotations.Nullable; @@ -39,8 +43,9 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; - import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisStringCommands.SetOption; @@ -78,6 +83,10 @@ public static Collection testParams() { return CacheTestParams.justConnectionFactories(); } + static Stream clearRemovesAffectedKeysArgs() { + return Stream.of(Arguments.of(BatchStrategies.keys()), Arguments.of(BatchStrategies.scan(37))); + } + @BeforeEach void setUp() { doWithConnection(RedisConnection::flushAll); @@ -533,6 +542,30 @@ void noOpStatisticsCollectorReturnsEmptyStatsInstance() { assertThat(stats.getPuts()).isZero(); } + @ParameterizedTest // GH-3236 + @MethodSource("clearRemovesAffectedKeysArgs") + void clearRemovesAffectedKeys(BatchStrategy batchStrategy) { + + DefaultRedisCacheWriter cw = (DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory, batchStrategy) + .withStatisticsCollector(CacheStatisticsCollector.create()); + + int nrKeys = 100; + for (int i = 0; i < nrKeys; i++) { + cw.putIfAbsent(CACHE_NAME, "%s::key-%s".formatted(CACHE_NAME, i).getBytes(StandardCharsets.UTF_8), + binaryCacheValue, Duration.ofSeconds(30)); + } + + cw.clear(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); + + doWithConnection(connection -> { + Awaitility.await().pollInSameThread().atMost(Duration.ofSeconds(5)).pollDelay(Duration.ZERO) + .until(() -> !connection.exists(binaryCacheKey)); + assertThat(connection.keyCommands().exists(binaryCacheKey)).isFalse(); + }); + + assertThat(cw.getCacheStatistics(CACHE_NAME).getDeletes()).isEqualTo(nrKeys); + } + @Test // GH-1686 @Disabled("Occasional failures on CI but not locally") void doLockShouldGetLock() throws InterruptedException { From fdb30a58b5b648cee6b50f284f1d6e64dd655622 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 28 Oct 2025 08:42:00 +0100 Subject: [PATCH 10/11] Additional unit tests --- .../DefaultRedisCachWriterUnitTests.java | 84 +++++++++++++++++-- 1 file changed, 79 insertions(+), 5 deletions(-) diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java index 0e89ee752c..2d45917f91 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java @@ -15,10 +15,23 @@ */ package org.springframework.data.redis.cache; -import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; - +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.withSettings; + +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; import java.time.Duration; import org.junit.jupiter.api.BeforeEach; @@ -26,8 +39,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - +import org.mockito.quality.Strictness; import org.springframework.dao.PessimisticLockingFailureException; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.ReactiveStringCommands; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisKeyCommands; @@ -127,4 +143,62 @@ void mustNotUnlockWhenLockingFails() { verify(mockKeyCommands, never()).del(any()); } + + @Test // GH-3236 + void usesAsyncPutIfPossible() { + + byte[] key = "TestKey".getBytes(); + byte[] value = "TestValue".getBytes(); + + RedisConnectionFactory connectionFactory = mock(RedisConnectionFactory.class, + withSettings().extraInterfaces(ReactiveRedisConnectionFactory.class)); + ReactiveRedisConnection mockConnection = mock(ReactiveRedisConnection.class); + ReactiveStringCommands mockStringCommands = mock(ReactiveStringCommands.class); + + doReturn(mockConnection).when((ReactiveRedisConnectionFactory) connectionFactory).getReactiveConnection(); + doReturn(mockStringCommands).when(mockConnection).stringCommands(); + doReturn(Mono.just(value)).when(mockStringCommands).set(any(), any(), any(), any()); + + RedisCacheWriter cacheWriter = RedisCacheWriter.create(connectionFactory, cfg -> { + cfg.immediateWrites(false); + }); + + cacheWriter.put("TestCache", key, value, null); + + verify(mockConnection, times(1)).stringCommands(); + verify(mockStringCommands, times(1)).set(eq(ByteBuffer.wrap(key)), any()); + } + + @Test // GH-3236 + void usesBlockingWritesIfConfiguredWithImmediateWritesEnabled() { + + byte[] key = "TestKey".getBytes(); + byte[] value = "TestValue".getBytes(); + + RedisConnectionFactory connectionFactory = mock(RedisConnectionFactory.class, + withSettings().strictness(Strictness.LENIENT).extraInterfaces(ReactiveRedisConnectionFactory.class)); + ReactiveRedisConnection reactiveMockConnection = mock(ReactiveRedisConnection.class, + withSettings().strictness(Strictness.LENIENT)); + ReactiveStringCommands reactiveMockStringCommands = mock(ReactiveStringCommands.class, + withSettings().strictness(Strictness.LENIENT)); + + doReturn(reactiveMockConnection).when((ReactiveRedisConnectionFactory) connectionFactory).getReactiveConnection(); + doReturn(reactiveMockStringCommands).when(reactiveMockConnection).stringCommands(); + + RedisStringCommands mockStringCommands = mock(RedisStringCommands.class); + + doReturn(mockStringCommands).when(this.mockConnection).stringCommands(); + doReturn(this.mockConnection).when(connectionFactory).getConnection(); + + RedisCacheWriter cacheWriter = RedisCacheWriter.create(connectionFactory, cfg -> { + cfg.immediateWrites(true); + }); + + cacheWriter.put("TestCache", key, value, null); + + verify(this.mockConnection, times(1)).stringCommands(); + verify(mockStringCommands, times(1)).set(eq(key), any()); + verify(reactiveMockConnection, never()).stringCommands(); + verify(reactiveMockStringCommands, never()).set(eq(ByteBuffer.wrap(key)), any()); + } } From b52edf67aad7b167736304b87efe093ea5f4ec8c Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 28 Oct 2025 08:42:26 +0100 Subject: [PATCH 11/11] Align naming of RedisCache and RedisCacheWriter --- .../redis/cache/DefaultRedisCacheWriter.java | 6 ++--- .../data/redis/cache/RedisCache.java | 4 ++-- .../data/redis/cache/RedisCacheWriter.java | 23 ++++++++++++++++--- .../cache/DefaultRedisCacheWriterTests.java | 20 ++++++++-------- 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 4957804362..600cb4f0b7 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -416,7 +416,7 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat } @Override - public void remove(String name, byte[] key) { + public void evict(String name, byte[] key) { Assert.notNull(name, "Name must not be null"); Assert.notNull(key, "Key must not be null"); @@ -424,12 +424,12 @@ public void remove(String name, byte[] key) { if (writeAsynchronously()) { asyncCacheWriter.remove(name, key).thenRun(() -> statistics.incDeletes(name)); } else { - removeIfPresent(name, key); + evictIfPresent(name, key); } } @Override - public boolean removeIfPresent(String name, byte[] key) { + public boolean evictIfPresent(String name, byte[] key) { Long removals = execute(name, connection -> connection.keyCommands().del(key)); statistics.incDeletes(name); diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCache.java b/src/main/java/org/springframework/data/redis/cache/RedisCache.java index 0f796a91f2..c3137b3d9b 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCache.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCache.java @@ -265,12 +265,12 @@ public void clearStatistics() { @Override public void evict(Object key) { - getCacheWriter().remove(getName(), createAndConvertCacheKey(key)); + getCacheWriter().evict(getName(), createAndConvertCacheKey(key)); } @Override public boolean evictIfPresent(Object key) { - return getCacheWriter().removeIfPresent(getName(), createAndConvertCacheKey(key)); + return getCacheWriter().evictIfPresent(getName(), createAndConvertCacheKey(key)); } @Override diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java index ed0e569734..6633817888 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java @@ -260,8 +260,24 @@ default CompletableFuture retrieve(String name, byte[] key) { * * @param name cache name must not be {@literal null}. * @param key key for the cache entry. Must not be {@literal null}. + * @deprecated since 4.0 in favor of {@link #evict(String, byte[])} */ - void remove(String name, byte[] key); + @Deprecated(since = "4.0", forRemoval = true) + default void remove(String name, byte[] key) { + evict(name, key); + } + + /** + * Remove the given key from Redis. + *

+ * Actual eviction may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entry. + * + * @param name cache name must not be {@literal null}. + * @param key key for the cache entry. Must not be {@literal null}. + * @since 4.0 + */ + void evict(String name, byte[] key); /** * Remove the given key from Redis if it is present, expecting the key to be immediately invisible for subsequent @@ -271,9 +287,10 @@ default CompletableFuture retrieve(String name, byte[] key) { * @param key key for the cache entry. Must not be {@literal null}. * @return {@code true} if the cache was known to have a mapping for this key before, {@code false} if it did not (or * if prior presence could not be determined). + * @since 4.0 */ - default boolean removeIfPresent(String name, byte[] key) { - remove(name, key); + default boolean evictIfPresent(String name, byte[] key) { + evict(name, key); return false; } diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index 1bc52446d2..265bc7a18a 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -286,14 +286,14 @@ void getWithValueLoaderShouldStoreCacheValue() { } @Test // DATAREDIS-481, DATAREDIS-1082 - void removeShouldDeleteEntry() { + void evictShouldDeleteEntry() { doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, config -> config.collectStatistics().immediateWrites()); - writer.remove(CACHE_NAME, binaryCacheKey); + writer.evict(CACHE_NAME, binaryCacheKey); doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); @@ -301,12 +301,12 @@ void removeShouldDeleteEntry() { } @Test // GH-3236 - void removeShouldNonblockingDeleteEntry() { + void evictShouldNonblockingDeleteEntry() { RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); assumeTrue(writer.supportsAsyncRetrieve()); - writer.remove(CACHE_NAME, binaryCacheKey); + writer.evict(CACHE_NAME, binaryCacheKey); doWithConnection(connection -> { Awaitility.await().pollInSameThread().pollDelay(Duration.ZERO).until(() -> !connection.exists(binaryCacheKey)); @@ -317,13 +317,13 @@ void removeShouldNonblockingDeleteEntry() { } @Test // GH-3236 - void removeShouldDeleteEntryIfExists() { + void evictIfPresentShouldDeleteEntryIfExists() { doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); - assertThat(writer.removeIfPresent(CACHE_NAME, binaryCacheKey)).isTrue(); + assertThat(writer.evictIfPresent(CACHE_NAME, binaryCacheKey)).isTrue(); doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); @@ -331,7 +331,7 @@ void removeShouldDeleteEntryIfExists() { } @Test // DATAREDIS-418, DATAREDIS-1082 - void cleanShouldRemoveAllKeysByPattern() { + void clearShouldRemoveAllKeysByPattern() { doWithConnection(connection -> { connection.set(binaryCacheKey, binaryCacheValue); @@ -341,7 +341,7 @@ void cleanShouldRemoveAllKeysByPattern() { RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, config -> config.collectStatistics().immediateWrites()); - writer.clean(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); + writer.clear(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); @@ -352,7 +352,7 @@ void cleanShouldRemoveAllKeysByPattern() { } @Test // GH-3236 - void nonBlockingCleanShouldRemoveAllKeysByPattern() { + void nonBlockingClearShouldRemoveAllKeysByPattern() { RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); assumeTrue(writer.supportsAsyncRetrieve()); @@ -362,7 +362,7 @@ void nonBlockingCleanShouldRemoveAllKeysByPattern() { connection.set("foo".getBytes(), "bar".getBytes()); }); - writer.clean(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); + writer.clear(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); doWithConnection(connection -> { Awaitility.await().pollInSameThread().pollDelay(Duration.ZERO).until(() -> !connection.exists(binaryCacheKey));