diff --git a/pom.xml b/pom.xml index 0e3024e099..fe25245089 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 4.0.0-SNAPSHOT + 4.0.0-GH-3236-SNAPSHOT Spring Data Redis Spring Data module for Redis diff --git a/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java b/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java index f84bc697aa..f90af59445 100644 --- a/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java +++ b/src/main/java/org/springframework/data/redis/cache/BatchStrategies.java @@ -98,13 +98,7 @@ public long cleanCache(RedisConnection connection, String name, byte[] pattern) /** * {@link BatchStrategy} using {@code SCAN}. */ - static class Scan implements BatchStrategy { - - private final int batchSize; - - Scan(int batchSize) { - this.batchSize = batchSize; - } + record Scan(int batchSize) implements BatchStrategy { @Override public long cleanCache(RedisConnection connection, String name, byte[] pattern) { diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index b18991d29d..600cb4f0b7 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -27,9 +27,12 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.jspecify.annotations.Nullable; + import org.springframework.dao.PessimisticLockingFailureException; +import org.springframework.data.redis.connection.ReactiveKeyCommands; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.ReactiveStringCommands; @@ -37,6 +40,7 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.connection.RedisStringCommands.SetOption; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.util.ByteUtils; import org.springframework.util.Assert; @@ -80,6 +84,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { private final AsyncCacheWriter asyncCacheWriter; + private final boolean asynchronousWrites; + /** * @param connectionFactory must not be {@literal null}. * @param batchStrategy must not be {@literal null}. @@ -95,19 +101,11 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { * @param batchStrategy must not be {@literal null}. */ DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) { - this(connectionFactory, sleepTime, TtlFunction.persistent(), CacheStatisticsCollector.none(), batchStrategy); + this(connectionFactory, sleepTime, TtlFunction.persistent(), CacheStatisticsCollector.none(), batchStrategy, true); } - /** - * @param connectionFactory must not be {@literal null}. - * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO} - * to disable locking. - * @param lockTtl Lock TTL function must not be {@literal null}. - * @param cacheStatisticsCollector must not be {@literal null}. - * @param batchStrategy must not be {@literal null}. - */ DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, TtlFunction lockTtl, - CacheStatisticsCollector cacheStatisticsCollector, BatchStrategy batchStrategy) { + CacheStatisticsCollector cacheStatisticsCollector, BatchStrategy batchStrategy, boolean asynchronousWrites) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); Assert.notNull(sleepTime, "SleepTime must not be null"); @@ -122,12 +120,117 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { this.batchStrategy = batchStrategy; if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) { - asyncCacheWriter = new AsynchronousCacheWriterDelegate(); + this.asyncCacheWriter = new AsynchronousCacheWriterDelegate(); + this.asynchronousWrites = asynchronousWrites; } else { asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE; + this.asynchronousWrites = false; } } + /** + * Create a new {@code DefaultRedisCacheWriter} applying configuration through {@code configurerConsumer}. + * + * @param connectionFactory the connection factory to use. + * @param configurerConsumer configuration consumer. + * @return a new {@code DefaultRedisCacheWriter}. + * @since 4.0 + */ + public static DefaultRedisCacheWriter create(RedisConnectionFactory connectionFactory, + Consumer configurerConsumer) { + + Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null"); + Assert.notNull(configurerConsumer, "RedisCacheWriterConfigurer function must not be null"); + + DefaultRedisCacheWriterConfigurer config = new DefaultRedisCacheWriterConfigurer(); + configurerConsumer.accept(config); + + return new DefaultRedisCacheWriter(connectionFactory, config.lockSleepTime, config.lockTtlFunction, + config.cacheStatisticsCollector, config.batchStrategy, !config.immediateWrites); + } + + static class DefaultRedisCacheWriterConfigurer + implements RedisCacheWriterConfigurer, CacheLockingConfigurer, CacheLockingConfiguration { + + CacheStatisticsCollector cacheStatisticsCollector = CacheStatisticsCollector.none(); + BatchStrategy batchStrategy = BatchStrategies.keys(); + Duration lockSleepTime = Duration.ZERO; + TtlFunction lockTtlFunction = TtlFunction.persistent(); + boolean immediateWrites = false; + + @Override + public RedisCacheWriterConfigurer collectStatistics(CacheStatisticsCollector cacheStatisticsCollector) { + + Assert.notNull(cacheStatisticsCollector, "CacheStatisticsCollector must not be null"); + this.cacheStatisticsCollector = cacheStatisticsCollector; + + return this; + } + + @Override + public RedisCacheWriterConfigurer batchStrategy(BatchStrategy batchStrategy) { + + Assert.notNull(batchStrategy, "BatchStrategy must not be null"); + this.batchStrategy = batchStrategy; + + return this; + } + + @Override + public RedisCacheWriterConfigurer cacheLocking(Consumer configurerConsumer) { + + Assert.notNull(configurerConsumer, "CacheLockingConfigurer function must not be null"); + configurerConsumer.accept(this); + + return this; + } + + @Override + public RedisCacheWriterConfigurer immediateWrites(boolean enableImmediateWrites) { + + this.immediateWrites = enableImmediateWrites; + return this; + } + + @Override + public void disable() { + this.lockSleepTime = Duration.ZERO; + } + + @Override + public void enable(Consumer configurationConsumer) { + + Assert.notNull(configurationConsumer, "CacheLockingConfigurer function must not be null"); + + if (this.lockSleepTime.isZero() || this.lockSleepTime.isNegative()) { + this.lockSleepTime = Duration.ofMillis(50); + } + configurationConsumer.accept(this); + } + + @Override + public CacheLockingConfiguration sleepTime(Duration sleepTime) { + + Assert.notNull(sleepTime, "Lock sleep time must not be null"); + Assert.isTrue(!sleepTime.isZero() && !sleepTime.isNegative(), + "Lock sleep time must not be null zero or negative"); + + this.lockSleepTime = sleepTime; + + return this; + } + + @Override + public CacheLockingConfiguration lockTimeout(TtlFunction ttlFunction) { + + Assert.notNull(ttlFunction, "TTL function must not be null"); + this.lockTtlFunction = ttlFunction; + + return this; + } + + } + @Override public byte @Nullable [] get(String name, byte[] key) { return get(name, key, null); @@ -206,6 +309,10 @@ public boolean supportsAsyncRetrieve() { return asyncCacheWriter.isSupported(); } + private boolean writeAsynchronously() { + return supportsAsyncRetrieve() && asynchronousWrites; + } + @Override public CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl) { @@ -234,11 +341,14 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) { Assert.notNull(key, "Key must not be null"); Assert.notNull(value, "Value must not be null"); - execute(name, connection -> { - doPut(connection, name, key, value, ttl); - return "OK"; - }); - + if (writeAsynchronously()) { + asyncCacheWriter.store(name, key, value, ttl).thenRun(() -> statistics.incPuts(name)); + } else { + execute(name, connection -> { + doPut(connection, name, key, value, ttl); + return "OK"; + }); + } } @SuppressWarnings("NullAway") @@ -306,22 +416,49 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat } @Override - public void remove(String name, byte[] key) { + public void evict(String name, byte[] key) { Assert.notNull(name, "Name must not be null"); Assert.notNull(key, "Key must not be null"); - execute(name, connection -> connection.keyCommands().del(key)); + if (writeAsynchronously()) { + asyncCacheWriter.remove(name, key).thenRun(() -> statistics.incDeletes(name)); + } else { + evictIfPresent(name, key); + } + } + + @Override + public boolean evictIfPresent(String name, byte[] key) { + + Long removals = execute(name, connection -> connection.keyCommands().del(key)); statistics.incDeletes(name); + + return removals > 0; } @Override - public void clean(String name, byte[] pattern) { + public void clear(String name, byte[] pattern) { Assert.notNull(name, "Name must not be null"); Assert.notNull(pattern, "Pattern must not be null"); - execute(name, connection -> { + if (writeAsynchronously()) { + asyncCacheWriter.clear(name, pattern, batchStrategy) + .thenAccept(deleteCount -> statistics.incDeletesBy(name, deleteCount.intValue())); + return; + } + + invalidate(name, pattern); + } + + @Override + public boolean invalidate(String name, byte[] pattern) { + + Assert.notNull(name, "Name must not be null"); + Assert.notNull(pattern, "Pattern must not be null"); + + return execute(name, connection -> { try { if (isLockingCacheWriter()) { @@ -337,13 +474,12 @@ public void clean(String name, byte[] pattern) { statistics.incDeletesBy(name, (int) deleteCount); + return deleteCount > 0; } finally { if (isLockingCacheWriter()) { doUnlock(name, connection); } } - - return "OK"; }); } @@ -360,7 +496,7 @@ public void clearStatistics(String name) { @Override public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector) { return new DefaultRedisCacheWriter(connectionFactory, sleepTime, lockTtl, cacheStatisticsCollector, - this.batchStrategy); + this.batchStrategy, this.asynchronousWrites); } /** @@ -499,6 +635,26 @@ interface AsyncCacheWriter { */ CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl); + /** + * Remove a cache entry asynchronously. + * + * @param name the cache name which to store the cache entry to. + * @param key the key for the cache entry. Must not be {@literal null}. + * @return a future that signals completion. + */ + CompletableFuture remove(String name, byte[] key); + + /** + * Clear the cache asynchronously. + * + * @param name the cache name which to store the cache entry to. + * @param pattern {@link String pattern} used to match Redis keys to clear. + * @param batchStrategy strategy to use. + * @return a future that signals completion emitting the number of removed keys. + * @since 4.0 + */ + CompletableFuture clear(String name, byte[] pattern, BatchStrategy batchStrategy); + } /** @@ -524,6 +680,17 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) { throw new UnsupportedOperationException("async store not supported"); } + + @Override + public CompletableFuture remove(String name, byte[] key) { + throw new UnsupportedOperationException("async remove not supported"); + } + + @Override + public CompletableFuture clear(String name, byte[] pattern, BatchStrategy batchStrategy) { + throw new UnsupportedOperationException("async clean not supported"); + } + } /** @@ -534,6 +701,14 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul */ class AsynchronousCacheWriterDelegate implements AsyncCacheWriter { + private static final int DEFAULT_SCAN_BATCH_SIZE = 64; + private final int clearBatchSize; + + public AsynchronousCacheWriterDelegate() { + this.clearBatchSize = batchStrategy instanceof BatchStrategies.Scan scan ? scan.batchSize() + : DEFAULT_SCAN_BATCH_SIZE; + } + @Override public boolean isSupported() { return true; @@ -552,7 +727,7 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur Mono get = shouldExpireWithin(ttl) ? stringCommands.getEx(wrappedKey, Expiration.from(ttl)) : stringCommands.get(wrappedKey); - return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture(); + return cacheLockCheck.then(get).map(ByteUtils::getBytes); }); } @@ -561,20 +736,12 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul return doWithConnection(connection -> { - Mono mono = isLockingCacheWriter() ? doStoreWithLocking(name, key, value, ttl, connection) - : doStore(key, value, ttl, connection); + Mono mono = doWithLocking(name, key, value, connection, () -> doStore(key, value, ttl, connection)); - return mono.then().toFuture(); + return mono.then(); }); } - private Mono doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl, - ReactiveRedisConnection connection) { - - return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection), - unused -> doUnlock(name, connection)); - } - @SuppressWarnings("NullAway") private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl, ReactiveRedisConnection connection) { @@ -590,6 +757,59 @@ private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration } } + @Override + public CompletableFuture remove(String name, byte[] key) { + + return doWithConnection(connection -> { + return doWithLocking(name, key, null, connection, () -> doRemove(key, connection)).then(); + }); + } + + @Override + public CompletableFuture clear(String name, byte[] pattern, BatchStrategy batchStrategy) { + + return doWithConnection(connection -> { + return doWithLocking(name, pattern, null, connection, () -> doClear(pattern, connection)); + }); + } + + private Mono doClear(byte[] pattern, ReactiveRedisConnection connection) { + + ReactiveKeyCommands commands = connection.keyCommands(); + + Flux keys; + + if (batchStrategy instanceof BatchStrategies.Keys) { + keys = commands.keys(ByteBuffer.wrap(pattern)).flatMapMany(Flux::fromIterable); + } else { + keys = commands.scan(ScanOptions.scanOptions().count(clearBatchSize).match(pattern).build()); + } + + return keys + .buffer(clearBatchSize) // + .flatMap(commands::mUnlink) // + .collect(Collectors.summingLong(Long::longValue)); + } + + @SuppressWarnings("NullAway") + private Mono doRemove(byte[] cacheKey, ReactiveRedisConnection connection) { + + ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey); + + return connection.keyCommands().unlink(wrappedKey); + } + + private Mono doWithLocking(String name, byte[] key, byte @Nullable [] value, + ReactiveRedisConnection connection, Supplier> action) { + + if (isLockingCacheWriter()) { + return Mono.usingWhen(doLock(name, key, value, connection), unused -> action.get(), + unused -> doUnlock(name, connection)); + } + + return action.get(); + } + private Mono doLock(String name, Object contextualKey, @Nullable Object contextualValue, ReactiveRedisConnection connection) { @@ -622,14 +842,16 @@ private Mono waitForLock(ReactiveRedisConnection connection, String cacheN } private CompletableFuture doWithConnection( - Function> callback) { + Function> callback) { ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory; return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), // - it -> Mono.fromCompletionStage(callback.apply(it)), // + callback::apply, // ReactiveRedisConnection::closeLater) // .toFuture(); } + } + } diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCache.java b/src/main/java/org/springframework/data/redis/cache/RedisCache.java index a2ec676b5a..c3137b3d9b 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCache.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCache.java @@ -235,9 +235,12 @@ public void clear() { } /** - * Clear keys that match the given {@link String keyPattern}. + * Clear keys that match the given {@link String keyPattern}. Useful when cache keys are formatted in a style where + * Redis patterns can be used for matching these. *

- * Useful when cache keys are formatted in a style where Redis patterns can be used for matching these. + * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entries. This may for example be the case with transactional cache decorators. Use {@link #invalidate()} + * for guaranteed immediate removal of entries. * * @param keyPattern {@link String pattern} used to match Redis keys to clear. * @since 3.0 @@ -246,6 +249,11 @@ public void clear(String keyPattern) { getCacheWriter().clean(getName(), createAndConvertCacheKey(keyPattern)); } + @Override + public boolean invalidate() { + return getCacheWriter().invalidate(getName(), createAndConvertCacheKey("*")); + } + /** * Reset all statistics counters and gauges for this cache. * @@ -257,7 +265,12 @@ public void clearStatistics() { @Override public void evict(Object key) { - getCacheWriter().remove(getName(), createAndConvertCacheKey(key)); + getCacheWriter().evict(getName(), createAndConvertCacheKey(key)); + } + + @Override + public boolean evictIfPresent(Object key) { + return getCacheWriter().evictIfPresent(getName(), createAndConvertCacheKey(key)); } @Override @@ -480,7 +493,7 @@ private String convertCollectionLikeOrMapKey(Object key, TypeDescriptor source) throw new IllegalArgumentException("Cannot convert cache key [%s] to String".formatted(key)); } - private byte[] createAndConvertCacheKey(Object key) { + byte[] createAndConvertCacheKey(Object key) { return serializeCacheKey(createCacheKey(key)); } diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java index ef63003afc..6633817888 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.Supplier; import org.jspecify.annotations.Nullable; @@ -41,7 +42,21 @@ public interface RedisCacheWriter extends CacheStatisticsProvider { /** - * Create new {@link RedisCacheWriter} without locking behavior. + * Create new {@link RedisCacheWriter} configure it through {@link RedisCacheWriterConfigurer}. The cache writer + * defaults does not lock the cache by default using {@link BatchStrategies#keys()}. + * + * @param connectionFactory the connection factory to use. + * @param configurerConsumer a configuration function that configures {@link RedisCacheWriterConfigurer}. + * @return new instance of {@link DefaultRedisCacheWriter}. + * @since 4.0 + */ + static RedisCacheWriter create(RedisConnectionFactory connectionFactory, + Consumer configurerConsumer) { + return DefaultRedisCacheWriter.create(connectionFactory, configurerConsumer); + } + + /** + * Create new {@link RedisCacheWriter} without locking behavior using {@link BatchStrategies#keys()}. * * @param connectionFactory must not be {@literal null}. * @return new instance of {@link DefaultRedisCacheWriter}. @@ -60,15 +75,11 @@ static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connec */ static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy) { - - Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); - Assert.notNull(batchStrategy, "BatchStrategy must not be null"); - - return new DefaultRedisCacheWriter(connectionFactory, batchStrategy); + return create(connectionFactory, config -> config.batchStrategy(batchStrategy)); } /** - * Create new {@link RedisCacheWriter} with locking behavior. + * Create new {@link RedisCacheWriter} with locking behavior using {@link BatchStrategies#keys()}. * * @param connectionFactory must not be {@literal null}. * @return new instance of {@link DefaultRedisCacheWriter}. @@ -88,7 +99,8 @@ static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectio static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy) { - return lockingRedisCacheWriter(connectionFactory, Duration.ofMillis(50), TtlFunction.persistent(), batchStrategy); + return create(connectionFactory, + it -> it.batchStrategy(batchStrategy).cacheLocking(CacheLockingConfigurer::enable)); } /** @@ -105,10 +117,8 @@ static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectio static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, TtlFunction lockTtlFunction, BatchStrategy batchStrategy) { - Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); - - return new DefaultRedisCacheWriter(connectionFactory, sleepTime, lockTtlFunction, CacheStatisticsCollector.none(), - batchStrategy); + return create(connectionFactory, it -> it.batchStrategy(batchStrategy) + .enableLocking(locking -> locking.sleepTime(sleepTime).lockTimeout(lockTtlFunction))); } /** @@ -244,19 +254,86 @@ default CompletableFuture retrieve(String name, byte[] key) { /** * Remove the given key from Redis. + *

+ * Actual eviction may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entry. + * + * @param name cache name must not be {@literal null}. + * @param key key for the cache entry. Must not be {@literal null}. + * @deprecated since 4.0 in favor of {@link #evict(String, byte[])} + */ + @Deprecated(since = "4.0", forRemoval = true) + default void remove(String name, byte[] key) { + evict(name, key); + } + + /** + * Remove the given key from Redis. + *

+ * Actual eviction may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entry. + * + * @param name cache name must not be {@literal null}. + * @param key key for the cache entry. Must not be {@literal null}. + * @since 4.0 + */ + void evict(String name, byte[] key); + + /** + * Remove the given key from Redis if it is present, expecting the key to be immediately invisible for subsequent + * lookups. * * @param name cache name must not be {@literal null}. * @param key key for the cache entry. Must not be {@literal null}. + * @return {@code true} if the cache was known to have a mapping for this key before, {@code false} if it did not (or + * if prior presence could not be determined). + * @since 4.0 + */ + default boolean evictIfPresent(String name, byte[] key) { + evict(name, key); + return false; + } + + /** + * Remove all keys following the given pattern. + *

+ * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entries. + * + * @param name cache name must not be {@literal null}. + * @param pattern pattern for the keys to remove. Must not be {@literal null}. + * @deprecated since 4.0 in favor of {@link #clear(String, byte[])} */ - void remove(String name, byte[] key); + @Deprecated(since = "4.0", forRemoval = true) + default void clean(String name, byte[] pattern) { + clear(name, pattern); + } /** * Remove all keys following the given pattern. + *

+ * Actual clearing may be performed in an asynchronous or deferred fashion, with subsequent lookups possibly still + * seeing the entries. + * + * @param name cache name must not be {@literal null}. + * @param pattern pattern for the keys to remove. Must not be {@literal null}. + * @since 4.0 + */ + void clear(String name, byte[] pattern); + + /** + * Remove all keys following the given pattern expecting all entries to be immediately invisible for subsequent + * lookups. * * @param name cache name must not be {@literal null}. * @param pattern pattern for the keys to remove. Must not be {@literal null}. + * @return {@code true} if the cache was known to have mappings before, {@code false} if it did not (or if prior + * presence of entries could not be determined). */ - void clean(String name, byte[] pattern); + default boolean invalidate(String name, byte[] pattern) { + clear(name, pattern); + return false; + } /** * Reset all statistics counters and gauges for this cache. @@ -273,6 +350,156 @@ default CompletableFuture retrieve(String name, byte[] key) { */ RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector); + /** + * Interface that allows for configuring a {@link RedisCacheWriter}. + * + * @author Mark Paluch + * @since 4.0 + */ + interface RedisCacheWriterConfigurer { + + /** + * Configure the {@link CacheStatisticsCollector} to use. This is useful for plugging in and/or customizing + * statistics collection. + */ + default RedisCacheWriterConfigurer collectStatistics() { + return collectStatistics(CacheStatisticsCollector.create()); + } + + /** + * Configure the {@link CacheStatisticsCollector} to use. This is useful for plugging in and/or customizing + * statistics collection. + *

+ * If no statistics collector is specified, no statistics will be recorded. Statistics collection can be + * reconfigured on the built RedisCacheWriter by invoking + * {@code RedisCacheWriter#withStatisticsCollector(CacheStatisticsCollector)}. + * + * @param cacheStatisticsCollector the statistics collector to use. + */ + RedisCacheWriterConfigurer collectStatistics(CacheStatisticsCollector cacheStatisticsCollector); + + /** + * Configure the {@link BatchStrategy} when clearing the cache (i.e. bulk removal of cache keys). + *

+ * If no batch strategy is specified, the RedisCacheWriter uses {@link BatchStrategies#keys()}; + * + * @param batchStrategy the batch strategy to use. + */ + RedisCacheWriterConfigurer batchStrategy(BatchStrategy batchStrategy); + + /** + * Enable cache locking to synchronize cache access across multiple cache instances. + */ + default RedisCacheWriterConfigurer enableLocking() { + return cacheLocking(it -> it.enable(config -> {})); + } + + /** + * Enable cache locking to synchronize cache access across multiple cache instances. + * + * @param configurerConsumer a configuration function that configures {@link CacheLockingConfiguration}. + */ + default RedisCacheWriterConfigurer enableLocking(Consumer configurerConsumer) { + return cacheLocking(it -> it.enable(configurerConsumer)); + } + + /** + * Configure cache locking to synchronize cache access across multiple cache instances. + * + * @param configurerConsumer a configuration function that configures {@link CacheLockingConfigurer}. + */ + RedisCacheWriterConfigurer cacheLocking(Consumer configurerConsumer); + + /** + * Use immediate writes (i.e. write operations such as + * {@link RedisCacheWriter#put(String, byte[], byte[], Duration)} or {@link #clear(String, byte[])}) shall apply + * immediately. + *

+ * Several {@link org.springframework.cache.Cache} operations can be performed asynchronously or deferred and this + * is the default behavior for {@link RedisCacheWriter}. Enable immediate writes in case a particular cache requires + * stronger consistency (i.e. Cache writes must be visible immediately). + *

+ * When using a {@link org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactive Redis + * driver}, immediate writes lead to blocking. + */ + default RedisCacheWriterConfigurer immediateWrites() { + return immediateWrites(true); + } + + /** + * Configure whether to use immediate writes (i.e. write operations such as + * {@link RedisCacheWriter#put(String, byte[], byte[], Duration)} or {@link #clear(String, byte[])}) shall apply + * immediately. + *

+ * Several {@link org.springframework.cache.Cache} operations can be performed asynchronously or deferred and this + * is the default behavior for {@link RedisCacheWriter}. Enable immediate writes in case a particular cache requires + * stronger consistency (i.e. Cache writes must be visible immediately). + *

+ * When using a {@link org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactive Redis + * driver}, immediate writes lead to blocking. + * + * @param enableImmediateWrites whether write operations must be visible immediately. + */ + RedisCacheWriterConfigurer immediateWrites(boolean enableImmediateWrites); + + } + + /** + * Interface that allows for configuring cache locking. + * + * @author Mark Paluch + * @since 4.0 + */ + interface CacheLockingConfigurer { + + /** + * Disable cache locking (default). + */ + void disable(); + + /** + * Enable cache locking with a default sleep time of {@code 50 milliseconds} and persistent lock keys. + */ + default void enable() { + enable(it -> {}); + } + + /** + * Enable cache locking. + */ + void enable(Consumer configurationConsumer); + + } + + /** + * Interface that allows for configuring cache locking options. + * + * @author Mark Paluch + * @since 4.0 + */ + interface CacheLockingConfiguration { + + /** + * Configure the sleep time between cache lock checks. Sleep time is applied to reattempt lock checks if a cache key + * is locked. + * + * @param sleepTime the sleep time, must not be {@literal null} and must be greater {@link Duration#ZERO}. + */ + CacheLockingConfiguration sleepTime(Duration sleepTime); + + /** + * Configure a {@link TtlFunction} to compute the lock timeout. + *

+ * If no TTL function is specified, the RedisCacheWriter persistent lock keys. Persistent lock keys need to be + * removed in case of failures (e.g. Redis crashes before a lock key is removed). Expiring lock keys can become + * subject to GC timing if lock keys expire while a garbage collection halts the JVM. + * + * @param ttlFunction the lock timeout function. + */ + CacheLockingConfiguration lockTimeout(TtlFunction ttlFunction); + + } + /** * Function to compute the time to live from the cache {@code key} and {@code value}. * @@ -299,9 +526,9 @@ static TtlFunction just(Duration duration) { } /** - * Returns a {@link TtlFunction} to create persistent entires that do not expire. + * Returns a {@link TtlFunction} to create persistent entries that do not expire. * - * @return a {@link TtlFunction} to create persistent entires that do not expire. + * @return a {@link TtlFunction} to create persistent entries that do not expire. */ static TtlFunction persistent() { return just(NO_EXPIRATION); @@ -323,4 +550,5 @@ static TtlFunction persistent() { Duration getTimeToLive(Object key, @Nullable Object value); } + } diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java index 0e89ee752c..2d45917f91 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCachWriterUnitTests.java @@ -15,10 +15,23 @@ */ package org.springframework.data.redis.cache; -import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; - +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.withSettings; + +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; import java.time.Duration; import org.junit.jupiter.api.BeforeEach; @@ -26,8 +39,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - +import org.mockito.quality.Strictness; import org.springframework.dao.PessimisticLockingFailureException; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.ReactiveStringCommands; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisKeyCommands; @@ -127,4 +143,62 @@ void mustNotUnlockWhenLockingFails() { verify(mockKeyCommands, never()).del(any()); } + + @Test // GH-3236 + void usesAsyncPutIfPossible() { + + byte[] key = "TestKey".getBytes(); + byte[] value = "TestValue".getBytes(); + + RedisConnectionFactory connectionFactory = mock(RedisConnectionFactory.class, + withSettings().extraInterfaces(ReactiveRedisConnectionFactory.class)); + ReactiveRedisConnection mockConnection = mock(ReactiveRedisConnection.class); + ReactiveStringCommands mockStringCommands = mock(ReactiveStringCommands.class); + + doReturn(mockConnection).when((ReactiveRedisConnectionFactory) connectionFactory).getReactiveConnection(); + doReturn(mockStringCommands).when(mockConnection).stringCommands(); + doReturn(Mono.just(value)).when(mockStringCommands).set(any(), any(), any(), any()); + + RedisCacheWriter cacheWriter = RedisCacheWriter.create(connectionFactory, cfg -> { + cfg.immediateWrites(false); + }); + + cacheWriter.put("TestCache", key, value, null); + + verify(mockConnection, times(1)).stringCommands(); + verify(mockStringCommands, times(1)).set(eq(ByteBuffer.wrap(key)), any()); + } + + @Test // GH-3236 + void usesBlockingWritesIfConfiguredWithImmediateWritesEnabled() { + + byte[] key = "TestKey".getBytes(); + byte[] value = "TestValue".getBytes(); + + RedisConnectionFactory connectionFactory = mock(RedisConnectionFactory.class, + withSettings().strictness(Strictness.LENIENT).extraInterfaces(ReactiveRedisConnectionFactory.class)); + ReactiveRedisConnection reactiveMockConnection = mock(ReactiveRedisConnection.class, + withSettings().strictness(Strictness.LENIENT)); + ReactiveStringCommands reactiveMockStringCommands = mock(ReactiveStringCommands.class, + withSettings().strictness(Strictness.LENIENT)); + + doReturn(reactiveMockConnection).when((ReactiveRedisConnectionFactory) connectionFactory).getReactiveConnection(); + doReturn(reactiveMockStringCommands).when(reactiveMockConnection).stringCommands(); + + RedisStringCommands mockStringCommands = mock(RedisStringCommands.class); + + doReturn(mockStringCommands).when(this.mockConnection).stringCommands(); + doReturn(this.mockConnection).when(connectionFactory).getConnection(); + + RedisCacheWriter cacheWriter = RedisCacheWriter.create(connectionFactory, cfg -> { + cfg.immediateWrites(true); + }); + + cacheWriter.put("TestCache", key, value, null); + + verify(this.mockConnection, times(1)).stringCommands(); + verify(mockStringCommands, times(1)).set(eq(key), any()); + verify(reactiveMockConnection, never()).stringCommands(); + verify(reactiveMockStringCommands, never()).set(eq(ByteBuffer.wrap(key)), any()); + } } diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index 4c4a55a37c..265bc7a18a 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -15,10 +15,13 @@ */ package org.springframework.data.redis.cache; -import static org.assertj.core.api.Assertions.*; -import static org.springframework.data.redis.cache.RedisCacheWriter.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assume.assumeTrue; +import static org.springframework.data.redis.cache.RedisCacheWriter.RedisCacheWriterConfigurer; +import static org.springframework.data.redis.cache.RedisCacheWriter.TtlFunction; +import static org.springframework.data.redis.cache.RedisCacheWriter.lockingRedisCacheWriter; +import static org.springframework.data.redis.cache.RedisCacheWriter.nonLockingRedisCacheWriter; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -31,14 +34,18 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; +import org.awaitility.Awaitility; import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; - import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisStringCommands.SetOption; @@ -76,6 +83,10 @@ public static Collection testParams() { return CacheTestParams.justConnectionFactories(); } + static Stream clearRemovesAffectedKeysArgs() { + return Stream.of(Arguments.of(BatchStrategies.keys()), Arguments.of(BatchStrategies.scan(37))); + } + @BeforeEach void setUp() { doWithConnection(RedisConnection::flushAll); @@ -85,10 +96,10 @@ void setUp() { // DATAREDIS-481, DATAREDIS-1082 void putShouldAddEternalEntry() { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + configurer -> configurer.immediateWrites().collectStatistics()); - writer.put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); + writer.putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); @@ -102,8 +113,8 @@ void putShouldAddEternalEntry() { @Test // DATAREDIS-481 void putShouldAddExpiringEntry() { - nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, - Duration.ofSeconds(1)); + RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::immediateWrites).putIfAbsent(CACHE_NAME, + binaryCacheKey, binaryCacheValue, Duration.ofSeconds(1)); doWithConnection(connection -> { assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); @@ -116,9 +127,26 @@ void putShouldOverwriteExistingEternalEntry() { doWithConnection(connection -> connection.set(binaryCacheKey, "foo".getBytes())); - nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); + RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::immediateWrites).put(CACHE_NAME, + binaryCacheKey, binaryCacheValue, Duration.ZERO); + + doWithConnection(connection -> { + assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); + assertThat(connection.ttl(binaryCacheKey)).isEqualTo(-1); + }); + } + + @Test // GH-3236 + void nonBlockingPutShouldWriteEntry() { + + RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory); + assumeTrue(writer.supportsAsyncRetrieve()); + + writer.put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { + + Awaitility.await().pollInSameThread().pollDelay(Duration.ZERO).until(() -> connection.exists(binaryCacheKey)); assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); assertThat(connection.ttl(binaryCacheKey)).isEqualTo(-1); }); @@ -130,8 +158,8 @@ void putShouldOverwriteExistingExpiringEntryAndResetTtl() { doWithConnection(connection -> connection.set(binaryCacheKey, "foo".getBytes(), Expiration.from(1, TimeUnit.MINUTES), SetOption.upsert())); - nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, - Duration.ofSeconds(5)); + RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::immediateWrites).put(CACHE_NAME, + binaryCacheKey, binaryCacheValue, Duration.ofSeconds(5)); doWithConnection(connection -> { assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue); @@ -144,8 +172,8 @@ void getShouldReturnValue() { doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + configurer -> configurer.immediateWrites().collectStatistics()); assertThat(writer.get(CACHE_NAME, binaryCacheKey)).isEqualTo(binaryCacheValue); assertThat(writer.getCacheStatistics(CACHE_NAME).getGets()).isOne(); @@ -164,8 +192,8 @@ void cacheHitRetrieveShouldIncrementStatistics() throws ExecutionException, Inte doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + configurer -> configurer.immediateWrites().collectStatistics()); writer.retrieve(CACHE_NAME, binaryCacheKey).get(); @@ -177,8 +205,7 @@ void cacheHitRetrieveShouldIncrementStatistics() throws ExecutionException, Inte @EnabledOnRedisDriver(RedisDriver.LETTUCE) void storeShouldIncrementStatistics() throws ExecutionException, InterruptedException { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); writer.store(CACHE_NAME, binaryCacheKey, binaryCacheValue, null).get(); @@ -189,8 +216,7 @@ void storeShouldIncrementStatistics() throws ExecutionException, InterruptedExce @EnabledOnRedisDriver(RedisDriver.LETTUCE) void cacheMissRetrieveWithLoaderAsyncShouldIncrementStatistics() throws ExecutionException, InterruptedException { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); writer.retrieve(CACHE_NAME, binaryCacheKey).get(); @@ -201,8 +227,7 @@ void cacheMissRetrieveWithLoaderAsyncShouldIncrementStatistics() throws Executio @Test // DATAREDIS-481, DATAREDIS-1082 void putIfAbsentShouldAddEternalEntryWhenKeyDoesNotExist() { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); assertThat(writer.putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO)).isNull(); @@ -234,8 +259,7 @@ void putIfAbsentShouldNotAddEternalEntryWhenKeyAlreadyExist() { @Test // DATAREDIS-481, DATAREDIS-1082 void putIfAbsentShouldAddExpiringEntryWhenKeyDoesNotExist() { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); assertThat(writer.putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ofSeconds(5))).isNull(); @@ -249,8 +273,7 @@ void putIfAbsentShouldAddExpiringEntryWhenKeyDoesNotExist() { @Test // GH-2890 void getWithValueLoaderShouldStoreCacheValue() { - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); writer.get(CACHE_NAME, binaryCacheKey, () -> binaryCacheValue, Duration.ofSeconds(5), true); @@ -263,14 +286,44 @@ void getWithValueLoaderShouldStoreCacheValue() { } @Test // DATAREDIS-481, DATAREDIS-1082 - void removeShouldDeleteEntry() { + void evictShouldDeleteEntry() { doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + config -> config.collectStatistics().immediateWrites()); - writer.remove(CACHE_NAME, binaryCacheKey); + writer.evict(CACHE_NAME, binaryCacheKey); + + doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); + } + + @Test // GH-3236 + void evictShouldNonblockingDeleteEntry() { + + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); + assumeTrue(writer.supportsAsyncRetrieve()); + + writer.evict(CACHE_NAME, binaryCacheKey); + + doWithConnection(connection -> { + Awaitility.await().pollInSameThread().pollDelay(Duration.ZERO).until(() -> !connection.exists(binaryCacheKey)); + assertThat(connection.exists(binaryCacheKey)).isFalse(); + }); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); + } + + @Test // GH-3236 + void evictIfPresentShouldDeleteEntryIfExists() { + + doWithConnection(connection -> connection.set(binaryCacheKey, binaryCacheValue)); + + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); + + assertThat(writer.evictIfPresent(CACHE_NAME, binaryCacheKey)).isTrue(); doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isFalse()); @@ -278,17 +331,59 @@ void removeShouldDeleteEntry() { } @Test // DATAREDIS-418, DATAREDIS-1082 - void cleanShouldRemoveAllKeysByPattern() { + void clearShouldRemoveAllKeysByPattern() { doWithConnection(connection -> { connection.set(binaryCacheKey, binaryCacheValue); connection.set("foo".getBytes(), "bar".getBytes()); }); - RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, + config -> config.collectStatistics().immediateWrites()); + + writer.clear(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); + + doWithConnection(connection -> { + assertThat(connection.exists(binaryCacheKey)).isFalse(); + assertThat(connection.exists("foo".getBytes())).isTrue(); + }); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); + } + + @Test // GH-3236 + void nonBlockingClearShouldRemoveAllKeysByPattern() { - writer.clean(CACHE_NAME, (CACHE_NAME + "::*").getBytes(Charset.forName("UTF-8"))); + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::collectStatistics); + assumeTrue(writer.supportsAsyncRetrieve()); + + doWithConnection(connection -> { + connection.set(binaryCacheKey, binaryCacheValue); + connection.set("foo".getBytes(), "bar".getBytes()); + }); + + writer.clear(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); + + doWithConnection(connection -> { + Awaitility.await().pollInSameThread().pollDelay(Duration.ZERO).until(() -> !connection.exists(binaryCacheKey)); + assertThat(connection.exists(binaryCacheKey)).isFalse(); + assertThat(connection.exists("foo".getBytes())).isTrue(); + }); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getDeletes()).isOne(); + } + + @Test // GH-3236 + void invalidateShouldRemoveAllKeysByPattern() { + + doWithConnection(connection -> { + connection.set(binaryCacheKey, binaryCacheValue); + connection.set("foo".getBytes(), "bar".getBytes()); + }); + + RedisCacheWriter writer = RedisCacheWriter.create(connectionFactory, config -> config.collectStatistics()); + + assertThat(writer.invalidate(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8))).isTrue(); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); @@ -303,7 +398,8 @@ void nonLockingCacheWriterShouldIgnoreExistingLock() { ((DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory)).lock(CACHE_NAME); - nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO); + RedisCacheWriter.create(connectionFactory, RedisCacheWriterConfigurer::immediateWrites).putIfAbsent(CACHE_NAME, + binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -315,8 +411,8 @@ void lockingCacheWriterShouldIgnoreExistingLockOnDifferenceCache() { ((DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory)).lock(CACHE_NAME); - lockingRedisCacheWriter(connectionFactory).put(CACHE_NAME + "-no-the-other-cache", binaryCacheKey, binaryCacheValue, - Duration.ZERO); + RedisCacheWriter.create(connectionFactory, it -> it.immediateWrites().enableLocking()) + .put(CACHE_NAME + "-no-the-other-cache", binaryCacheKey, binaryCacheValue, Duration.ZERO); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -326,8 +422,8 @@ void lockingCacheWriterShouldIgnoreExistingLockOnDifferenceCache() { @Test // DATAREDIS-481, DATAREDIS-1082 void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { - DefaultRedisCacheWriter writer = (DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory) - .withStatisticsCollector(CacheStatisticsCollector.create()); + DefaultRedisCacheWriter writer = DefaultRedisCacheWriter.create(connectionFactory, + it -> it.enableLocking().immediateWrites().collectStatistics()); writer.lock(CACHE_NAME); CountDownLatch beforeWrite = new CountDownLatch(1); @@ -345,8 +441,6 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { beforeWrite.await(); - Thread.sleep(200); - doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isFalse(); }); @@ -359,7 +453,6 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { }); assertThat(writer.getCacheStatistics(CACHE_NAME).getLockWaitDuration(TimeUnit.NANOSECONDS)).isGreaterThan(0); - } finally { th.interrupt(); } @@ -368,9 +461,12 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException { @Test // DATAREDIS-481 void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() throws InterruptedException { - DefaultRedisCacheWriter cw = (DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory); + DefaultRedisCacheWriter cw = DefaultRedisCacheWriter.create(connectionFactory, + it -> it.enableLocking().immediateWrites()); cw.lock(CACHE_NAME); + Assumptions.assumeFalse(cw.supportsAsyncRetrieve()); + CountDownLatch beforeWrite = new CountDownLatch(1); CountDownLatch afterWrite = new CountDownLatch(1); AtomicReference exceptionRef = new AtomicReference<>(); @@ -378,7 +474,7 @@ void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() throws Inte Thread th = new Thread(() -> { DefaultRedisCacheWriter writer = new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50), - BatchStrategies.keys()) { + TtlFunction.persistent(), CacheStatisticsCollector.none(), BatchStrategies.keys(), false) { @Override boolean doCheckLock(String name, RedisConnection connection) { @@ -446,6 +542,30 @@ void noOpStatisticsCollectorReturnsEmptyStatsInstance() { assertThat(stats.getPuts()).isZero(); } + @ParameterizedTest // GH-3236 + @MethodSource("clearRemovesAffectedKeysArgs") + void clearRemovesAffectedKeys(BatchStrategy batchStrategy) { + + DefaultRedisCacheWriter cw = (DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory, batchStrategy) + .withStatisticsCollector(CacheStatisticsCollector.create()); + + int nrKeys = 100; + for (int i = 0; i < nrKeys; i++) { + cw.putIfAbsent(CACHE_NAME, "%s::key-%s".formatted(CACHE_NAME, i).getBytes(StandardCharsets.UTF_8), + binaryCacheValue, Duration.ofSeconds(30)); + } + + cw.clear(CACHE_NAME, (CACHE_NAME + "::*").getBytes(StandardCharsets.UTF_8)); + + doWithConnection(connection -> { + Awaitility.await().pollInSameThread().atMost(Duration.ofSeconds(5)).pollDelay(Duration.ZERO) + .until(() -> !connection.exists(binaryCacheKey)); + assertThat(connection.keyCommands().exists(binaryCacheKey)).isFalse(); + }); + + assertThat(cw.getCacheStatistics(CACHE_NAME).getDeletes()).isEqualTo(nrKeys); + } + @Test // GH-1686 @Disabled("Occasional failures on CI but not locally") void doLockShouldGetLock() throws InterruptedException { diff --git a/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java index 2fc0795872..26f04c51ed 100644 --- a/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java @@ -25,10 +25,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; @@ -57,7 +59,6 @@ public class LegacyRedisCacheTests { private static final String CACHE_NAME = "testCache"; - private final boolean allowCacheNullValues; private ObjectFactory keyFactory; @@ -81,7 +82,7 @@ public static Collection testParams() { Collection params = AbstractOperationsTestParams.testParams(); - Collection target = new ArrayList<>(); + List target = new ArrayList<>(); for (Object[] source : params) { Object[] cacheNullDisabled = Arrays.copyOf(source, source.length + 1); @@ -106,7 +107,8 @@ private RedisCache createCache() { cacheConfiguration = cacheConfiguration.disableCachingNullValues(); } - return new RedisCache(CACHE_NAME, RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory), + return new RedisCache(CACHE_NAME, + RedisCacheWriter.create(connectionFactory, RedisCacheWriter.RedisCacheWriterConfigurer::immediateWrites), cacheConfiguration); } @@ -147,6 +149,7 @@ void testCacheClear() { assertThat(cache.get(key2)).isNull(); cache.put(key2, value2); cache.clear(); + assertThat(cache.get(key2)).isNull(); assertThat(cache.get(key1)).isNull(); } @@ -333,7 +336,7 @@ void cachePutWithNullShouldAddStuffToRedisWhenCachingNullIsEnabled() { @Test // DATAREDIS-553 void testCacheGetSynchronizedNullAllowingNull() { - assumeThat(allowCacheNullValues).as("Only suitable when cache does allow null values.").isTrue(); + Assumptions.assumeTrue(allowCacheNullValues, "Only suitable when cache does allow null values."); Object key = getKey(); Object value = cache.get(key, () -> null); diff --git a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java index da46400668..c0e2251fcf 100644 --- a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java @@ -30,10 +30,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.assertj.core.api.ThrowingConsumer; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -102,8 +102,7 @@ void setUp() { @Test // DATAREDIS-481 void putShouldAddEntry() { - cache.put("key-1", sample); - + cache.put(key, sample); doWithConnection(connection -> assertThat(connection.exists(binaryCacheKey)).isTrue()); } @@ -133,7 +132,7 @@ void cacheShouldNotBeClearedIfNoPatternMatch() { // DATAREDIS-481 void putNullShouldAddEntryForNullValue() { - cache.put("key-1", null); + cache.put(key, null); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -144,7 +143,7 @@ void putNullShouldAddEntryForNullValue() { @Test // DATAREDIS-481 void putIfAbsentShouldAddEntryIfNotExists() { - cache.putIfAbsent("key-1", sample); + cache.putIfAbsent(key, sample); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -155,7 +154,7 @@ void putIfAbsentShouldAddEntryIfNotExists() { @Test // DATAREDIS-481 void putIfAbsentWithNullShouldAddNullValueEntryIfNotExists() { - assertThat(cache.putIfAbsent("key-1", null)).isNull(); + assertThat(cache.putIfAbsent(key, null)).isNull(); doWithConnection(connection -> { assertThat(connection.exists(binaryCacheKey)).isTrue(); @@ -295,6 +294,23 @@ void evictShouldRemoveKey() { }); } + @Test // GH-3236 + void evictShouldRemoveKeyIfPresent() { + + doWithConnection(connection -> { + connection.set(binaryCacheKey, binaryNullValue); + connection.set("other".getBytes(), "value".getBytes()); + }); + + assertThat(cache.evictIfPresent(key)).isTrue(); + assertThat(cache.evictIfPresent(key)).isFalse(); + + doWithConnection(connection -> { + assertThat(connection.exists(binaryCacheKey)).isFalse(); + assertThat(connection.exists("other".getBytes())).isTrue(); + }); + } + @Test // GH-2028 void clearShouldClearCache() { @@ -311,12 +327,28 @@ void clearShouldClearCache() { }); } + @Test // GH-2028 + void clearShouldInvalidateCache() { + + doWithConnection(connection -> { + connection.set(binaryCacheKey, binaryNullValue); + connection.set("other".getBytes(), "value".getBytes()); + }); + + assertThat(cache.invalidate()).isTrue(); + + doWithConnection(connection -> { + assertThat(connection.exists(binaryCacheKey)).isFalse(); + assertThat(connection.exists("other".getBytes())).isTrue(); + }); + } + @Test // GH-1721 @EnabledOnRedisDriver(RedisDriver.LETTUCE) // SCAN not supported via Jedis Cluster. void clearWithScanShouldClearCache() { RedisCache cache = new RedisCache("cache", - RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.scan(25)), + RedisCacheWriter.create(connectionFactory, it -> it.immediateWrites().batchStrategy(BatchStrategies.scan(25))), RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(SerializationPair.fromSerializer(serializer))); doWithConnection(connection -> { @@ -364,11 +396,11 @@ void getWithCallableShouldNotResolveValueIfPresent() { void computePrefixCreatesCacheKeyCorrectly() { RedisCache cacheWithCustomPrefix = new RedisCache("cache", - RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory), + RedisCacheWriter.create(connectionFactory, RedisCacheWriter.RedisCacheWriterConfigurer::immediateWrites), RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(SerializationPair.fromSerializer(serializer)) .computePrefixWith(cacheName -> "_" + cacheName + "_")); - cacheWithCustomPrefix.put("key-1", sample); + cacheWithCustomPrefix.put(key, sample); doWithConnection( connection -> assertThat(connection.stringCommands().get("_cache_key-1".getBytes(StandardCharsets.UTF_8))) @@ -379,10 +411,11 @@ void computePrefixCreatesCacheKeyCorrectly() { void prefixCacheNameCreatesCacheKeyCorrectly() { RedisCache cacheWithCustomPrefix = new RedisCache("cache", - RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory), RedisCacheConfiguration.defaultCacheConfig() + RedisCacheWriter.create(connectionFactory, RedisCacheWriter.RedisCacheWriterConfigurer::immediateWrites), + RedisCacheConfiguration.defaultCacheConfig() .serializeValuesWith(SerializationPair.fromSerializer(serializer)).prefixCacheNameWith("redis::")); - cacheWithCustomPrefix.put("key-1", sample); + cacheWithCustomPrefix.put(key, sample); doWithConnection(connection -> assertThat( connection.stringCommands().get("redis::cache::key-1".getBytes(StandardCharsets.UTF_8))) @@ -688,16 +721,18 @@ private RedisCacheWriter usingRedisCacheWriter() { } private RedisCacheWriter usingLockingRedisCacheWriter() { - return RedisCacheWriter.lockingRedisCacheWriter(this.connectionFactory); + return usingLockingRedisCacheWriter(Duration.ofMillis(50L)); } private RedisCacheWriter usingLockingRedisCacheWriter(Duration sleepTime) { - return RedisCacheWriter.lockingRedisCacheWriter(this.connectionFactory, sleepTime, - RedisCacheWriter.TtlFunction.persistent(), BatchStrategies.keys()); + return RedisCacheWriter.create(this.connectionFactory, config -> { + config.immediateWrites().enableLocking(it -> it.sleepTime(sleepTime)).batchStrategy(BatchStrategies.keys()); + }); } private RedisCacheWriter usingNonLockingRedisCacheWriter() { - return RedisCacheWriter.nonLockingRedisCacheWriter(this.connectionFactory); + return RedisCacheWriter.create(this.connectionFactory, + config -> config.immediateWrites().batchStrategy(BatchStrategies.keys())); } private @Nullable Object unwrap(@Nullable Object value) { @@ -712,7 +747,7 @@ private Function withTtiExpira return entryTtlFunction.andThen(RedisCacheConfiguration::enableTimeToIdle); } - void doWithConnection(Consumer callback) { + void doWithConnection(ThrowingConsumer callback) { try (RedisConnection connection = connectionFactory.getConnection()) { callback.accept(connection); }