2727import java .util .function .Consumer ;
2828import java .util .function .Function ;
2929import java .util .function .Supplier ;
30+ import java .util .stream .Collectors ;
3031
3132import org .jspecify .annotations .Nullable ;
33+
3234import org .springframework .dao .PessimisticLockingFailureException ;
35+ import org .springframework .data .redis .connection .ReactiveKeyCommands ;
3336import org .springframework .data .redis .connection .ReactiveRedisConnection ;
3437import org .springframework .data .redis .connection .ReactiveRedisConnectionFactory ;
3538import org .springframework .data .redis .connection .ReactiveStringCommands ;
3639import org .springframework .data .redis .connection .RedisConnection ;
3740import org .springframework .data .redis .connection .RedisConnectionFactory ;
3841import org .springframework .data .redis .connection .RedisStringCommands ;
3942import org .springframework .data .redis .connection .RedisStringCommands .SetOption ;
43+ import org .springframework .data .redis .core .ScanOptions ;
4044import org .springframework .data .redis .core .types .Expiration ;
4145import org .springframework .data .redis .util .ByteUtils ;
4246import org .springframework .util .Assert ;
@@ -311,8 +315,20 @@ public void remove(String name, byte[] key) {
311315 Assert .notNull (name , "Name must not be null" );
312316 Assert .notNull (key , "Key must not be null" );
313317
314- execute (name , connection -> connection .keyCommands ().del (key ));
318+ if (supportsAsyncRetrieve ()) {
319+ asyncCacheWriter .remove (name , key ).thenRun (() -> statistics .incDeletes (name ));
320+ } else {
321+ removeIfPresent (name , key );
322+ }
323+ }
324+
325+ @ Override
326+ public boolean removeIfPresent (String name , byte [] key ) {
327+
328+ Long removals = execute (name , connection -> connection .keyCommands ().del (key ));
315329 statistics .incDeletes (name );
330+
331+ return removals > 0 ;
316332 }
317333
318334 @ Override
@@ -321,7 +337,22 @@ public void clean(String name, byte[] pattern) {
321337 Assert .notNull (name , "Name must not be null" );
322338 Assert .notNull (pattern , "Pattern must not be null" );
323339
324- execute (name , connection -> {
340+ if (supportsAsyncRetrieve ()) {
341+ asyncCacheWriter .clean (name , pattern , batchStrategy )
342+ .thenAccept (deleteCount -> statistics .incDeletesBy (name , deleteCount .intValue ()));
343+ return ;
344+ }
345+
346+ invalidate (name , pattern );
347+ }
348+
349+ @ Override
350+ public boolean invalidate (String name , byte [] pattern ) {
351+
352+ Assert .notNull (name , "Name must not be null" );
353+ Assert .notNull (pattern , "Pattern must not be null" );
354+
355+ return execute (name , connection -> {
325356
326357 try {
327358 if (isLockingCacheWriter ()) {
@@ -337,13 +368,12 @@ public void clean(String name, byte[] pattern) {
337368
338369 statistics .incDeletesBy (name , (int ) deleteCount );
339370
371+ return deleteCount > 0 ;
340372 } finally {
341373 if (isLockingCacheWriter ()) {
342374 doUnlock (name , connection );
343375 }
344376 }
345-
346- return "OK" ;
347377 });
348378 }
349379
@@ -499,6 +529,25 @@ interface AsyncCacheWriter {
499529 */
500530 CompletableFuture <Void > store (String name , byte [] key , byte [] value , @ Nullable Duration ttl );
501531
532+ /**
533+ * Remove a cache entry asynchronously.
534+ *
535+ * @param name the cache name which to store the cache entry to.
536+ * @param key the key for the cache entry. Must not be {@literal null}.
537+ * @return a future that signals completion.
538+ */
539+ CompletableFuture <Void > remove (String name , byte [] key );
540+
541+ /**
542+ * Clear the cache asynchronously.
543+ *
544+ * @param name the cache name which to store the cache entry to.
545+ * @param pattern {@link String pattern} used to match Redis keys to clear.
546+ * @param batchStrategy strategy to use.
547+ * @return a future that signals completion emitting the number of removed keys.
548+ */
549+ CompletableFuture <Long > clean (String name , byte [] pattern , BatchStrategy batchStrategy );
550+
502551 }
503552
504553 /**
@@ -524,6 +573,17 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
524573 public CompletableFuture <Void > store (String name , byte [] key , byte [] value , @ Nullable Duration ttl ) {
525574 throw new UnsupportedOperationException ("async store not supported" );
526575 }
576+
577+ @ Override
578+ public CompletableFuture <Void > remove (String name , byte [] key ) {
579+ throw new UnsupportedOperationException ("async remove not supported" );
580+ }
581+
582+ @ Override
583+ public CompletableFuture <Long > clean (String name , byte [] pattern , BatchStrategy batchStrategy ) {
584+ throw new UnsupportedOperationException ("async clean not supported" );
585+ }
586+
527587 }
528588
529589 /**
@@ -534,6 +594,14 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
534594 */
535595 class AsynchronousCacheWriterDelegate implements AsyncCacheWriter {
536596
597+ private static final int DEFAULT_SCAN_BATCH_SIZE = 64 ;
598+ private final int cleanBatchSize ;
599+
600+ public AsynchronousCacheWriterDelegate () {
601+ this .cleanBatchSize = batchStrategy instanceof BatchStrategies .Scan scan ? scan .batchSize
602+ : DEFAULT_SCAN_BATCH_SIZE ;
603+ }
604+
537605 @ Override
538606 public boolean isSupported () {
539607 return true ;
@@ -561,20 +629,12 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
561629
562630 return doWithConnection (connection -> {
563631
564- Mono <?> mono = isLockingCacheWriter () ? doStoreWithLocking (name , key , value , ttl , connection )
565- : doStore (key , value , ttl , connection );
632+ Mono <?> mono = doWithLocking (name , key , value , connection , () -> doStore (key , value , ttl , connection ));
566633
567634 return mono .then ().toFuture ();
568635 });
569636 }
570637
571- private Mono <Boolean > doStoreWithLocking (String name , byte [] key , byte [] value , @ Nullable Duration ttl ,
572- ReactiveRedisConnection connection ) {
573-
574- return Mono .usingWhen (doLock (name , key , value , connection ), unused -> doStore (key , value , ttl , connection ),
575- unused -> doUnlock (name , connection ));
576- }
577-
578638 @ SuppressWarnings ("NullAway" )
579639 private Mono <Boolean > doStore (byte [] cacheKey , byte [] value , @ Nullable Duration ttl ,
580640 ReactiveRedisConnection connection ) {
@@ -590,6 +650,65 @@ private Mono<Boolean> doStore(byte[] cacheKey, byte[] value, @Nullable Duration
590650 }
591651 }
592652
653+ @ Override
654+ public CompletableFuture <Void > remove (String name , byte [] key ) {
655+
656+ return doWithConnection (connection -> {
657+
658+ Mono <?> mono = doWithLocking (name , key , null , connection , () -> doRemove (key , connection ));
659+
660+ return mono .then ().toFuture ();
661+ });
662+ }
663+
664+ @ Override
665+ public CompletableFuture <Long > clean (String name , byte [] pattern , BatchStrategy batchStrategy ) {
666+
667+ return doWithConnection (connection -> {
668+
669+ Mono <Long > mono = doWithLocking (name , pattern , null , connection , () -> doClean (pattern , connection ));
670+
671+ return mono .toFuture ();
672+ });
673+ }
674+
675+ private Mono <Long > doClean (byte [] pattern , ReactiveRedisConnection connection ) {
676+
677+ ReactiveKeyCommands commands = connection .keyCommands ();
678+
679+ Flux <ByteBuffer > keys ;
680+
681+ if (batchStrategy instanceof BatchStrategies .Keys ) {
682+ keys = commands .keys (ByteBuffer .wrap (pattern )).flatMapMany (Flux ::fromIterable );
683+ } else {
684+ keys = commands .scan (ScanOptions .scanOptions ().count (cleanBatchSize ).match (pattern ).build ());
685+ }
686+
687+ return keys
688+ .buffer (cleanBatchSize ) //
689+ .flatMap (commands ::mUnlink ) //
690+ .collect (Collectors .summingLong (Long ::longValue ));
691+ }
692+
693+ @ SuppressWarnings ("NullAway" )
694+ private Mono <Long > doRemove (byte [] cacheKey , ReactiveRedisConnection connection ) {
695+
696+ ByteBuffer wrappedKey = ByteBuffer .wrap (cacheKey );
697+
698+ return connection .keyCommands ().unlink (wrappedKey );
699+ }
700+
701+ private <T > Mono <T > doWithLocking (String name , byte [] key , byte @ Nullable [] value ,
702+ ReactiveRedisConnection connection , Supplier <Mono <T >> action ) {
703+
704+ if (isLockingCacheWriter ()) {
705+ return Mono .usingWhen (doLock (name , key , value , connection ), unused -> action .get (),
706+ unused -> doUnlock (name , connection ));
707+ }
708+
709+ return action .get ();
710+ }
711+
593712 private Mono <Object > doLock (String name , Object contextualKey , @ Nullable Object contextualValue ,
594713 ReactiveRedisConnection connection ) {
595714
@@ -631,5 +750,7 @@ private <T> CompletableFuture<T> doWithConnection(
631750 ReactiveRedisConnection ::closeLater ) //
632751 .toFuture ();
633752 }
753+
634754 }
755+
635756}
0 commit comments