> callback) {
ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory;
return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), //
- it -> Mono.fromCompletionStage(callback.apply(it)), //
+ callback::apply, //
ReactiveRedisConnection::closeLater) //
.toFuture();
}
+
}
+
}
diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCache.java b/src/main/java/org/springframework/data/redis/cache/RedisCache.java
index a2ec676b5a..c3137b3d9b 100644
--- a/src/main/java/org/springframework/data/redis/cache/RedisCache.java
+++ b/src/main/java/org/springframework/data/redis/cache/RedisCache.java
@@ -235,9 +235,12 @@ public void clear() {
}
/**
- * Clear keys that match the given {@link String keyPattern}.
+ * Clear keys that match the given {@link String keyPattern}. Useful when cache keys are formatted in a style where
+ * Redis patterns can be used for matching these.
*
- * Useful when cache keys are formatted in a style where Redis patterns can be used for matching these.
+ * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still
+ * seeing the entries. This may for example be the case with transactional cache decorators. Use {@link #invalidate()}
+ * for guaranteed immediate removal of entries.
*
* @param keyPattern {@link String pattern} used to match Redis keys to clear.
* @since 3.0
@@ -246,6 +249,11 @@ public void clear(String keyPattern) {
getCacheWriter().clean(getName(), createAndConvertCacheKey(keyPattern));
}
+ @Override
+ public boolean invalidate() {
+ return getCacheWriter().invalidate(getName(), createAndConvertCacheKey("*"));
+ }
+
/**
* Reset all statistics counters and gauges for this cache.
*
@@ -257,7 +265,12 @@ public void clearStatistics() {
@Override
public void evict(Object key) {
- getCacheWriter().remove(getName(), createAndConvertCacheKey(key));
+ getCacheWriter().evict(getName(), createAndConvertCacheKey(key));
+ }
+
+ @Override
+ public boolean evictIfPresent(Object key) {
+ return getCacheWriter().evictIfPresent(getName(), createAndConvertCacheKey(key));
}
@Override
@@ -480,7 +493,7 @@ private String convertCollectionLikeOrMapKey(Object key, TypeDescriptor source)
throw new IllegalArgumentException("Cannot convert cache key [%s] to String".formatted(key));
}
- private byte[] createAndConvertCacheKey(Object key) {
+ byte[] createAndConvertCacheKey(Object key) {
return serializeCacheKey(createCacheKey(key));
}
diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java
index ef63003afc..6633817888 100644
--- a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java
+++ b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java
@@ -17,6 +17,7 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.jspecify.annotations.Nullable;
@@ -41,7 +42,21 @@
public interface RedisCacheWriter extends CacheStatisticsProvider {
/**
- * Create new {@link RedisCacheWriter} without locking behavior.
+ * Create new {@link RedisCacheWriter} configure it through {@link RedisCacheWriterConfigurer}. The cache writer
+ * defaults does not lock the cache by default using {@link BatchStrategies#keys()}.
+ *
+ * @param connectionFactory the connection factory to use.
+ * @param configurerConsumer a configuration function that configures {@link RedisCacheWriterConfigurer}.
+ * @return new instance of {@link DefaultRedisCacheWriter}.
+ * @since 4.0
+ */
+ static RedisCacheWriter create(RedisConnectionFactory connectionFactory,
+ Consumer configurerConsumer) {
+ return DefaultRedisCacheWriter.create(connectionFactory, configurerConsumer);
+ }
+
+ /**
+ * Create new {@link RedisCacheWriter} without locking behavior using {@link BatchStrategies#keys()}.
*
* @param connectionFactory must not be {@literal null}.
* @return new instance of {@link DefaultRedisCacheWriter}.
@@ -60,15 +75,11 @@ static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connec
*/
static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory,
BatchStrategy batchStrategy) {
-
- Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
- Assert.notNull(batchStrategy, "BatchStrategy must not be null");
-
- return new DefaultRedisCacheWriter(connectionFactory, batchStrategy);
+ return create(connectionFactory, config -> config.batchStrategy(batchStrategy));
}
/**
- * Create new {@link RedisCacheWriter} with locking behavior.
+ * Create new {@link RedisCacheWriter} with locking behavior using {@link BatchStrategies#keys()}.
*
* @param connectionFactory must not be {@literal null}.
* @return new instance of {@link DefaultRedisCacheWriter}.
@@ -88,7 +99,8 @@ static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectio
static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory,
BatchStrategy batchStrategy) {
- return lockingRedisCacheWriter(connectionFactory, Duration.ofMillis(50), TtlFunction.persistent(), batchStrategy);
+ return create(connectionFactory,
+ it -> it.batchStrategy(batchStrategy).cacheLocking(CacheLockingConfigurer::enable));
}
/**
@@ -105,10 +117,8 @@ static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectio
static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime,
TtlFunction lockTtlFunction, BatchStrategy batchStrategy) {
- Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
-
- return new DefaultRedisCacheWriter(connectionFactory, sleepTime, lockTtlFunction, CacheStatisticsCollector.none(),
- batchStrategy);
+ return create(connectionFactory, it -> it.batchStrategy(batchStrategy)
+ .enableLocking(locking -> locking.sleepTime(sleepTime).lockTimeout(lockTtlFunction)));
}
/**
@@ -244,19 +254,86 @@ default CompletableFuture retrieve(String name, byte[] key) {
/**
* Remove the given key from Redis.
+ *
+ * Actual eviction may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still
+ * seeing the entry.
+ *
+ * @param name cache name must not be {@literal null}.
+ * @param key key for the cache entry. Must not be {@literal null}.
+ * @deprecated since 4.0 in favor of {@link #evict(String, byte[])}
+ */
+ @Deprecated(since = "4.0", forRemoval = true)
+ default void remove(String name, byte[] key) {
+ evict(name, key);
+ }
+
+ /**
+ * Remove the given key from Redis.
+ *
+ * Actual eviction may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still
+ * seeing the entry.
+ *
+ * @param name cache name must not be {@literal null}.
+ * @param key key for the cache entry. Must not be {@literal null}.
+ * @since 4.0
+ */
+ void evict(String name, byte[] key);
+
+ /**
+ * Remove the given key from Redis if it is present, expecting the key to be immediately invisible for subsequent
+ * lookups.
*
* @param name cache name must not be {@literal null}.
* @param key key for the cache entry. Must not be {@literal null}.
+ * @return {@code true} if the cache was known to have a mapping for this key before, {@code false} if it did not (or
+ * if prior presence could not be determined).
+ * @since 4.0
+ */
+ default boolean evictIfPresent(String name, byte[] key) {
+ evict(name, key);
+ return false;
+ }
+
+ /**
+ * Remove all keys following the given pattern.
+ *
+ * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still
+ * seeing the entries.
+ *
+ * @param name cache name must not be {@literal null}.
+ * @param pattern pattern for the keys to remove. Must not be {@literal null}.
+ * @deprecated since 4.0 in favor of {@link #clear(String, byte[])}
*/
- void remove(String name, byte[] key);
+ @Deprecated(since = "4.0", forRemoval = true)
+ default void clean(String name, byte[] pattern) {
+ clear(name, pattern);
+ }
/**
* Remove all keys following the given pattern.
+ *
+ * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still
+ * seeing the entries.
+ *
+ * @param name cache name must not be {@literal null}.
+ * @param pattern pattern for the keys to remove. Must not be {@literal null}.
+ * @since 4.0
+ */
+ void clear(String name, byte[] pattern);
+
+ /**
+ * Remove all keys following the given pattern expecting all entries to be immediately invisible for subsequent
+ * lookups.
*
* @param name cache name must not be {@literal null}.
* @param pattern pattern for the keys to remove. Must not be {@literal null}.
+ * @return {@code true} if the cache was known to have mappings before, {@code false} if it did not (or if prior
+ * presence of entries could not be determined).
*/
- void clean(String name, byte[] pattern);
+ default boolean invalidate(String name, byte[] pattern) {
+ clear(name, pattern);
+ return false;
+ }
/**
* Reset all statistics counters and gauges for this cache.
@@ -273,6 +350,156 @@ default CompletableFuture retrieve(String name, byte[] key) {
*/
RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector);
+ /**
+ * Interface that allows for configuring a {@link RedisCacheWriter}.
+ *
+ * @author Mark Paluch
+ * @since 4.0
+ */
+ interface RedisCacheWriterConfigurer {
+
+ /**
+ * Configure the {@link CacheStatisticsCollector} to use. This is useful for plugging in and/or customizing
+ * statistics collection.
+ */
+ default RedisCacheWriterConfigurer collectStatistics() {
+ return collectStatistics(CacheStatisticsCollector.create());
+ }
+
+ /**
+ * Configure the {@link CacheStatisticsCollector} to use. This is useful for plugging in and/or customizing
+ * statistics collection.
+ *
+ * If no statistics collector is specified, no statistics will be recorded. Statistics collection can be
+ * reconfigured on the built RedisCacheWriter by invoking
+ * {@code RedisCacheWriter#withStatisticsCollector(CacheStatisticsCollector)}.
+ *
+ * @param cacheStatisticsCollector the statistics collector to use.
+ */
+ RedisCacheWriterConfigurer collectStatistics(CacheStatisticsCollector cacheStatisticsCollector);
+
+ /**
+ * Configure the {@link BatchStrategy} when clearing the cache (i.e. bulk removal of cache keys).
+ *
+ * If no batch strategy is specified, the RedisCacheWriter uses {@link BatchStrategies#keys()};
+ *
+ * @param batchStrategy the batch strategy to use.
+ */
+ RedisCacheWriterConfigurer batchStrategy(BatchStrategy batchStrategy);
+
+ /**
+ * Enable cache locking to synchronize cache access across multiple cache instances.
+ */
+ default RedisCacheWriterConfigurer enableLocking() {
+ return cacheLocking(it -> it.enable(config -> {}));
+ }
+
+ /**
+ * Enable cache locking to synchronize cache access across multiple cache instances.
+ *
+ * @param configurerConsumer a configuration function that configures {@link CacheLockingConfiguration}.
+ */
+ default RedisCacheWriterConfigurer enableLocking(Consumer configurerConsumer) {
+ return cacheLocking(it -> it.enable(configurerConsumer));
+ }
+
+ /**
+ * Configure cache locking to synchronize cache access across multiple cache instances.
+ *
+ * @param configurerConsumer a configuration function that configures {@link CacheLockingConfigurer}.
+ */
+ RedisCacheWriterConfigurer cacheLocking(Consumer configurerConsumer);
+
+ /**
+ * Use immediate writes (i.e. write operations such as
+ * {@link RedisCacheWriter#put(String, byte[], byte[], Duration)} or {@link #clear(String, byte[])}) shall apply
+ * immediately.
+ *
+ * Several {@link org.springframework.cache.Cache} operations can be performed asynchronously or deferred and this
+ * is the default behavior for {@link RedisCacheWriter}. Enable immediate writes in case a particular cache requires
+ * stronger consistency (i.e. Cache writes must be visible immediately).
+ *
+ * When using a {@link org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactive Redis
+ * driver}, immediate writes lead to blocking.
+ */
+ default RedisCacheWriterConfigurer immediateWrites() {
+ return immediateWrites(true);
+ }
+
+ /**
+ * Configure whether to use immediate writes (i.e. write operations such as
+ * {@link RedisCacheWriter#put(String, byte[], byte[], Duration)} or {@link #clear(String, byte[])}) shall apply
+ * immediately.
+ *
+ * Several {@link org.springframework.cache.Cache} operations can be performed asynchronously or deferred and this
+ * is the default behavior for {@link RedisCacheWriter}. Enable immediate writes in case a particular cache requires
+ * stronger consistency (i.e. Cache writes must be visible immediately).
+ *
+ * When using a {@link org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactive Redis
+ * driver}, immediate writes lead to blocking.
+ *
+ * @param enableImmediateWrites whether write operations must be visible immediately.
+ */
+ RedisCacheWriterConfigurer immediateWrites(boolean enableImmediateWrites);
+
+ }
+
+ /**
+ * Interface that allows for configuring cache locking.
+ *
+ * @author Mark Paluch
+ * @since 4.0
+ */
+ interface CacheLockingConfigurer {
+
+ /**
+ * Disable cache locking (default).
+ */
+ void disable();
+
+ /**
+ * Enable cache locking with a default sleep time of {@code 50 milliseconds} and persistent lock keys.
+ */
+ default void enable() {
+ enable(it -> {});
+ }
+
+ /**
+ * Enable cache locking.
+ */
+ void enable(Consumer configurationConsumer);
+
+ }
+
+ /**
+ * Interface that allows for configuring cache locking options.
+ *
+ * @author Mark Paluch
+ * @since 4.0
+ */
+ interface CacheLockingConfiguration {
+
+ /**
+ * Configure the sleep time between cache lock checks. Sleep time is applied to reattempt lock checks if a cache key
+ * is locked.
+ *
+ * @param sleepTime the sleep time, must not be {@literal null} and must be greater {@link Duration#ZERO}.
+ */
+ CacheLockingConfiguration sleepTime(Duration sleepTime);
+
+ /**
+ * Configure a {@link TtlFunction} to compute the lock timeout.
+ *
+ * If no TTL function is specified, the RedisCacheWriter persistent lock keys. Persistent lock keys need to be
+ * removed in case of failures (e.g. Redis crashes before a lock key is removed). Expiring lock keys can become
+ * subject to GC timing if lock keys expire while a garbage collection halts the JVM.
+ *
+ * @param ttlFunction the lock timeout function.
+ */
+ CacheLockingConfiguration lockTimeout(TtlFunction ttlFunction);
+
+ }
+
/**
* Function to compute the time to live from the cache {@code key} and {@code value}.
*
@@ -299,9 +526,9 @@ static TtlFunction just(Duration duration) {
}
/**
- * Returns a {@link TtlFunction} to create persistent entires that do not expire.
+ * Returns a {@link TtlFunction} to create persistent entries that do not expire.
*
- * @return a {@link TtlFunction} to create persistent entires that do not expire.
+ * @return a {@link TtlFunction} to create persistent entries that do not expire.
*/
static TtlFunction persistent() {
return just(NO_EXPIRATION);
@@ -323,4 +550,5 @@ static TtlFunction persistent() {
Duration getTimeToLive(Object key, @Nullable Object value);
}
+
}
diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java
index 0e89ee752c..2d45917f91 100644
--- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java
@@ -15,10 +15,23 @@
*/
package org.springframework.data.redis.cache;
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
-
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatException;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.withSettings;
+
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
import java.time.Duration;
import org.junit.jupiter.api.BeforeEach;
@@ -26,8 +39,11 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-
+import org.mockito.quality.Strictness;
import org.springframework.dao.PessimisticLockingFailureException;
+import org.springframework.data.redis.connection.ReactiveRedisConnection;
+import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
+import org.springframework.data.redis.connection.ReactiveStringCommands;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisKeyCommands;
@@ -127,4 +143,62 @@ void mustNotUnlockWhenLockingFails() {
verify(mockKeyCommands, never()).del(any());
}
+
+ @Test // GH-3236
+ void usesAsyncPutIfPossible() {
+
+ byte[] key = "TestKey".getBytes();
+ byte[] value = "TestValue".getBytes();
+
+ RedisConnectionFactory connectionFactory = mock(RedisConnectionFactory.class,
+ withSettings().extraInterfaces(ReactiveRedisConnectionFactory.class));
+ ReactiveRedisConnection mockConnection = mock(ReactiveRedisConnection.class);
+ ReactiveStringCommands mockStringCommands = mock(ReactiveStringCommands.class);
+
+ doReturn(mockConnection).when((ReactiveRedisConnectionFactory) connectionFactory).getReactiveConnection();
+ doReturn(mockStringCommands).when(mockConnection).stringCommands();
+ doReturn(Mono.just(value)).when(mockStringCommands).set(any(), any(), any(), any());
+
+ RedisCacheWriter cacheWriter = RedisCacheWriter.create(connectionFactory, cfg -> {
+ cfg.immediateWrites(false);
+ });
+
+ cacheWriter.put("TestCache", key, value, null);
+
+ verify(mockConnection, times(1)).stringCommands();
+ verify(mockStringCommands, times(1)).set(eq(ByteBuffer.wrap(key)), any());
+ }
+
+ @Test // GH-3236
+ void usesBlockingWritesIfConfiguredWithImmediateWritesEnabled() {
+
+ byte[] key = "TestKey".getBytes();
+ byte[] value = "TestValue".getBytes();
+
+ RedisConnectionFactory connectionFactory = mock(RedisConnectionFactory.class,
+ withSettings().strictness(Strictness.LENIENT).extraInterfaces(ReactiveRedisConnectionFactory.class));
+ ReactiveRedisConnection reactiveMockConnection = mock(ReactiveRedisConnection.class,
+ withSettings().strictness(Strictness.LENIENT));
+ ReactiveStringCommands reactiveMockStringCommands = mock(ReactiveStringCommands.class,
+ withSettings().strictness(Strictness.LENIENT));
+
+ doReturn(reactiveMockConnection).when((ReactiveRedisConnectionFactory) connectionFactory).getReactiveConnection();
+ doReturn(reactiveMockStringCommands).when(reactiveMockConnection).stringCommands();
+
+ RedisStringCommands mockStringCommands = mock(RedisStringCommands.class);
+
+ doReturn(mockStringCommands).when(this.mockConnection).stringCommands();
+ doReturn(this.mockConnection).when(connectionFactory).getConnection();
+
+ RedisCacheWriter cacheWriter = RedisCacheWriter.create(connectionFactory, cfg -> {
+ cfg.immediateWrites(true);
+ });
+
+ cacheWriter.put("TestCache", key, value, null);
+
+ verify(this.mockConnection, times(1)).stringCommands();
+ verify(mockStringCommands, times(1)).set(eq(key), any());
+ verify(reactiveMockConnection, never()).stringCommands();
+ verify(reactiveMockStringCommands, never()).set(eq(ByteBuffer.wrap(key)), any());
+ }
}
diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java
index 4c4a55a37c..265bc7a18a 100644
--- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java
+++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java
@@ -15,10 +15,13 @@
*/
package org.springframework.data.redis.cache;
-import static org.assertj.core.api.Assertions.*;
-import static org.springframework.data.redis.cache.RedisCacheWriter.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assume.assumeTrue;
+import static org.springframework.data.redis.cache.RedisCacheWriter.RedisCacheWriterConfigurer;
+import static org.springframework.data.redis.cache.RedisCacheWriter.TtlFunction;
+import static org.springframework.data.redis.cache.RedisCacheWriter.lockingRedisCacheWriter;
+import static org.springframework.data.redis.cache.RedisCacheWriter.nonLockingRedisCacheWriter;
-import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
@@ -31,14 +34,18 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Stream;
+import org.awaitility.Awaitility;
import org.jspecify.annotations.Nullable;
+import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
@@ -76,6 +83,10 @@ public static Collection