@@ -16,6 +16,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
1616import { RedisLegacyClient , RedisLegacyClientType } from './legacy-mode' ;
1717import { RedisPoolOptions , RedisClientPool } from './pool' ;
1818import { RedisVariadicArgument , parseArgs , pushVariadicArguments } from '../commands/generic-transformers' ;
19+ import { BasicClientSideCache , ClientSideCacheConfig , ClientSideCacheProvider } from './cache' ;
1920import { BasicCommandParser , CommandParser } from './parser' ;
2021
2122export interface RedisClientOptions <
@@ -80,6 +81,10 @@ export interface RedisClientOptions<
8081 * TODO
8182 */
8283 commandOptions ?: CommandOptions < TYPE_MAPPING > ;
84+ /**
85+ * TODO
86+ */
87+ clientSideCache ?: ClientSideCacheProvider | ClientSideCacheConfig ;
8388}
8489
8590type WithCommands <
@@ -303,9 +308,8 @@ export default class RedisClient<
303308 // was in a watch transaction when
304309 // a topology change occured
305310 #dirtyWatch?: string ;
306- #epoch: number ;
307311 #watchEpoch?: number ;
308-
312+ #clientSideCache?: ClientSideCacheProvider ;
309313 #credentialsSubscription: Disposable | null = null ;
310314
311315 get options ( ) : RedisClientOptions < M , F , S , RESP > | undefined {
@@ -324,6 +328,11 @@ export default class RedisClient<
324328 return this . _self . #queue. isPubSubActive ;
325329 }
326330
331+ get socketEpoch ( ) {
332+ return this . _self . #socket. socketEpoch ;
333+ }
334+
335+
327336 get isWatching ( ) {
328337 return this . _self . #watchEpoch !== undefined ;
329338 }
@@ -348,10 +357,20 @@ export default class RedisClient<
348357
349358 constructor ( options ?: RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > ) {
350359 super ( ) ;
360+
351361 this . #options = this . #initiateOptions( options ) ;
352362 this . #queue = this . #initiateQueue( ) ;
353363 this . #socket = this . #initiateSocket( ) ;
354- this . #epoch = 0 ;
364+
365+ if ( options ?. clientSideCache ) {
366+ if ( options . clientSideCache instanceof ClientSideCacheProvider ) {
367+ this . #clientSideCache = options . clientSideCache ;
368+ } else {
369+ const cscConfig = options . clientSideCache ;
370+ this . #clientSideCache = new BasicClientSideCache ( cscConfig ) ;
371+ }
372+ this . #queue. setInvalidateCallback ( this . #clientSideCache. invalidate . bind ( this . #clientSideCache) ) ;
373+ }
355374 }
356375
357376 #initiateOptions( options ?: RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > ) : RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > | undefined {
@@ -512,6 +531,13 @@ export default class RedisClient<
512531 ) ;
513532 }
514533
534+ if ( this . #clientSideCache) {
535+ const tracking = this . #clientSideCache. trackingOn ( ) ;
536+ if ( tracking ) {
537+ commands . push ( tracking ) ;
538+ }
539+ }
540+
515541 return commands ;
516542 }
517543
@@ -565,6 +591,7 @@ export default class RedisClient<
565591 } )
566592 . on ( 'error' , err => {
567593 this . emit ( 'error' , err ) ;
594+ this . #clientSideCache?. onError ( ) ;
568595 if ( this . #socket. isOpen && ! this . #options?. disableOfflineQueue ) {
569596 this . #queue. flushWaitingForReply ( err ) ;
570597 } else {
@@ -573,7 +600,6 @@ export default class RedisClient<
573600 } )
574601 . on ( 'connect' , ( ) => this . emit ( 'connect' ) )
575602 . on ( 'ready' , ( ) => {
576- this . #epoch++ ;
577603 this . emit ( 'ready' ) ;
578604 this . #setPingTimer( ) ;
579605 this . #maybeScheduleWrite( ) ;
@@ -701,14 +727,21 @@ export default class RedisClient<
701727 commandOptions : CommandOptions < TYPE_MAPPING > | undefined ,
702728 transformReply : TransformReply | undefined ,
703729 ) {
704- const reply = await this . sendCommand ( parser . redisArgs , commandOptions ) ;
730+ const csc = this . _self . #clientSideCache;
731+ const defaultTypeMapping = this . _self . #options?. commandOptions === commandOptions ;
705732
706- if ( transformReply ) {
707- const res = transformReply ( reply , parser . preserve , commandOptions ?. typeMapping ) ;
708- return res
709- }
733+ const fn = ( ) => { return this . sendCommand ( parser . redisArgs , commandOptions ) } ;
710734
711- return reply ;
735+ if ( csc && command . CACHEABLE && defaultTypeMapping ) {
736+ return await csc . handleCache ( this . _self , parser as BasicCommandParser , fn , transformReply , commandOptions ?. typeMapping ) ;
737+ } else {
738+ const reply = await fn ( ) ;
739+
740+ if ( transformReply ) {
741+ return transformReply ( reply , parser . preserve , commandOptions ?. typeMapping ) ;
742+ }
743+ return reply ;
744+ }
712745 }
713746
714747 /**
@@ -873,7 +906,7 @@ export default class RedisClient<
873906 const reply = await this . _self . sendCommand (
874907 pushVariadicArguments ( [ 'WATCH' ] , key )
875908 ) ;
876- this . _self . #watchEpoch ??= this . _self . #epoch ;
909+ this . _self . #watchEpoch ??= this . _self . socketEpoch ;
877910 return reply as unknown as ReplyWithTypeMapping < SimpleStringReply < 'OK' > , TYPE_MAPPING > ;
878911 }
879912
@@ -940,7 +973,7 @@ export default class RedisClient<
940973 }
941974
942975 const chainId = Symbol ( 'Pipeline Chain' ) ,
943- promise = Promise . all (
976+ promise = Promise . allSettled (
944977 commands . map ( ( { args } ) => this . _self . #queue. addCommand ( args , {
945978 chainId,
946979 typeMapping : this . _commandOptions ?. typeMapping
@@ -976,7 +1009,7 @@ export default class RedisClient<
9761009 throw new WatchError ( dirtyWatch ) ;
9771010 }
9781011
979- if ( watchEpoch && watchEpoch !== this . _self . #epoch ) {
1012+ if ( watchEpoch && watchEpoch !== this . _self . socketEpoch ) {
9801013 throw new WatchError ( 'Client reconnected after WATCH' ) ;
9811014 }
9821015
@@ -1200,6 +1233,7 @@ export default class RedisClient<
12001233 return new Promise < void > ( resolve => {
12011234 clearTimeout ( this . _self . #pingTimer) ;
12021235 this . _self . #socket. close ( ) ;
1236+ this . _self . #clientSideCache?. onClose ( ) ;
12031237
12041238 if ( this . _self . #queue. isEmpty ( ) ) {
12051239 this . _self . #socket. destroySocket ( ) ;
@@ -1226,6 +1260,7 @@ export default class RedisClient<
12261260 clearTimeout ( this . _self . #pingTimer) ;
12271261 this . _self . #queue. flushAll ( new DisconnectsClientError ( ) ) ;
12281262 this . _self . #socket. destroy ( ) ;
1263+ this . _self . #clientSideCache?. onClose ( ) ;
12291264 this . _self . #credentialsSubscription?. dispose ( ) ;
12301265 this . _self . #credentialsSubscription = null ;
12311266 }
0 commit comments