@@ -46,17 +46,18 @@ public L1L2RedisCache(
4646 . Deserialize < CacheMessage > (
4747 message . ToString ( ) ,
4848 JsonSerializerOptions ) ;
49- if ( cacheMessage . PublisherId != PublisherId )
49+ if ( cacheMessage ? . PublisherId != PublisherId )
5050 {
5151 MemoryCache . Remove (
52- $ "{ KeyPrefix } { cacheMessage . Key } ") ;
52+ $ "{ KeyPrefix } { cacheMessage ? . Key } ") ;
5353 MemoryCache . Remove (
54- $ "{ LockKeyPrefix } { cacheMessage . Key } ") ;
54+ $ "{ LockKeyPrefix } { cacheMessage ? . Key } ") ;
5555 }
5656 } ) ;
5757 }
5858
59- private static object LockKeyLock { get ; } = new object ( ) ;
59+ private static SemaphoreSlim KeySemaphore { get ; } =
60+ new SemaphoreSlim ( 1 , 1 ) ;
6061
6162 public string Channel { get ; }
6263 public IDatabase Database { get ; }
@@ -69,7 +70,7 @@ public L1L2RedisCache(
6970 public RedisCacheOptions RedisCacheOptions { get ; }
7071 public ISubscriber Subscriber { get ; }
7172
72- public byte [ ] Get ( string key )
73+ public byte [ ] ? Get ( string key )
7374 {
7475 var value = MemoryCache . Get (
7576 $ "{ KeyPrefix } { key } ") as byte [ ] ;
@@ -79,10 +80,11 @@ public byte[] Get(string key)
7980 if ( Database . KeyExists (
8081 $ "{ KeyPrefix } { key } ") )
8182 {
82- var localLock = GetOrCreateLock (
83+ var semaphore = GetOrCreateLock (
8384 key ,
8485 null ) ;
85- lock ( localLock )
86+ semaphore . Wait ( ) ;
87+ try
8688 {
8789 var hashEntries = GetHashEntries ( key ) ;
8890 var distributedCacheEntryOptions = hashEntries
@@ -95,16 +97,20 @@ public byte[] Get(string key)
9597 distributedCacheEntryOptions ) ;
9698 SetLock (
9799 key ,
98- localLock ,
100+ semaphore ,
99101 distributedCacheEntryOptions ) ;
100102 }
103+ finally
104+ {
105+ semaphore . Release ( ) ;
106+ }
101107 }
102108 }
103109
104110 return value ;
105111 }
106112
107- public async Task < byte [ ] > GetAsync (
113+ public async Task < byte [ ] ? > GetAsync (
108114 string key ,
109115 CancellationToken cancellationToken = default )
110116 {
@@ -116,12 +122,14 @@ public async Task<byte[]> GetAsync(
116122 if ( await Database . KeyExistsAsync (
117123 $ "{ KeyPrefix } { key } ") )
118124 {
119- var localLock = await GetOrCreateLockAsync (
125+ var semaphore = await GetOrCreateLockAsync (
120126 key ,
121- null ) ;
122- lock ( localLock )
127+ null ,
128+ cancellationToken ) ;
129+ await semaphore . WaitAsync ( cancellationToken ) ;
130+ try
123131 {
124- var hashEntries = GetHashEntries ( key ) ;
132+ var hashEntries = await GetHashEntriesAsync ( key ) ;
125133 var distributedCacheEntryOptions = hashEntries
126134 . GetDistributedCacheEntryOptions ( ) ;
127135 value = hashEntries . GetRedisValue ( ) ;
@@ -132,9 +140,13 @@ public async Task<byte[]> GetAsync(
132140 distributedCacheEntryOptions ) ;
133141 SetLock (
134142 key ,
135- localLock ,
143+ semaphore ,
136144 distributedCacheEntryOptions ) ;
137145 }
146+ finally
147+ {
148+ semaphore . Release ( ) ;
149+ }
138150 }
139151 }
140152
@@ -155,7 +167,9 @@ public async Task RefreshAsync(
155167
156168 public void Remove ( string key )
157169 {
158- lock ( GetOrCreateLock ( key , null ) )
170+ var semaphore = GetOrCreateLock ( key , null ) ;
171+ semaphore . Wait ( ) ;
172+ try
159173 {
160174 DistributedCache . Remove ( key ) ;
161175 MemoryCache . Remove (
@@ -171,14 +185,20 @@ public void Remove(string key)
171185 JsonSerializerOptions ) ) ;
172186 MemoryCache . Remove ( $ "{ LockKeyPrefix } { key } ") ;
173187 }
188+ finally
189+ {
190+ semaphore . Release ( ) ;
191+ }
174192 }
175193
176194 public async Task RemoveAsync (
177195 string key ,
178196 CancellationToken cancellationToken = default )
179197 {
180- lock ( await GetOrCreateLockAsync (
181- key , null , cancellationToken ) )
198+ var semaphore = await GetOrCreateLockAsync (
199+ key , null , cancellationToken ) ;
200+ await semaphore . WaitAsync ( cancellationToken ) ;
201+ try
182202 {
183203 DistributedCache . Remove ( key ) ;
184204 MemoryCache . Remove (
@@ -194,14 +214,21 @@ public async Task RemoveAsync(
194214 JsonSerializerOptions ) ) ;
195215 MemoryCache . Remove ( $ "{ LockKeyPrefix } { key } ") ;
196216 }
217+ finally
218+ {
219+ semaphore . Release ( ) ;
220+ }
197221 }
198222
199223 public void Set (
200224 string key ,
201225 byte [ ] value ,
202226 DistributedCacheEntryOptions distributedCacheEntryOptions )
203227 {
204- lock ( GetOrCreateLock ( key , distributedCacheEntryOptions ) )
228+ var semaphore = GetOrCreateLock (
229+ key , distributedCacheEntryOptions ) ;
230+ semaphore . Wait ( ) ;
231+ try
205232 {
206233 DistributedCache . Set (
207234 key , value , distributedCacheEntryOptions ) ;
@@ -217,6 +244,10 @@ public void Set(
217244 } ,
218245 JsonSerializerOptions ) ) ;
219246 }
247+ finally
248+ {
249+ semaphore . Release ( ) ;
250+ }
220251 }
221252
222253 public async Task SetAsync (
@@ -225,14 +256,19 @@ public async Task SetAsync(
225256 DistributedCacheEntryOptions distributedCacheEntryOptions ,
226257 CancellationToken cancellationToken = default )
227258 {
228- lock ( await GetOrCreateLockAsync (
229- key , distributedCacheEntryOptions , cancellationToken ) )
259+ var semaphore = await GetOrCreateLockAsync (
260+ key , distributedCacheEntryOptions , cancellationToken ) ;
261+ await semaphore . WaitAsync ( cancellationToken ) ;
262+ try
230263 {
231- DistributedCache . Set (
232- key , value , distributedCacheEntryOptions ) ;
264+ await DistributedCache . SetAsync (
265+ key ,
266+ value ,
267+ distributedCacheEntryOptions ,
268+ cancellationToken ) ;
233269 SetMemoryCache (
234270 key , value , distributedCacheEntryOptions ) ;
235- Subscriber . Publish (
271+ await Subscriber . PublishAsync (
236272 Channel ,
237273 JsonSerializer . Serialize (
238274 new CacheMessage
@@ -242,11 +278,15 @@ public async Task SetAsync(
242278 } ,
243279 JsonSerializerOptions ) ) ;
244280 }
281+ finally
282+ {
283+ semaphore . Release ( ) ;
284+ }
245285 }
246286
247287 private HashEntry [ ] GetHashEntries ( string key )
248288 {
249- var hashEntries = new HashEntry [ ] { } ;
289+ var hashEntries = Array . Empty < HashEntry > ( ) ;
250290
251291 try
252292 {
@@ -258,11 +298,26 @@ private HashEntry[] GetHashEntries(string key)
258298 return hashEntries ;
259299 }
260300
261- private object GetOrCreateLock (
301+ private async Task < HashEntry [ ] > GetHashEntriesAsync ( string key )
302+ {
303+ var hashEntries = Array . Empty < HashEntry > ( ) ;
304+
305+ try
306+ {
307+ hashEntries = await Database . HashGetAllAsync (
308+ $ "{ KeyPrefix } { key } ") ;
309+ }
310+ catch ( RedisServerException ) { }
311+
312+ return hashEntries ;
313+ }
314+
315+ private SemaphoreSlim GetOrCreateLock (
262316 string key ,
263- DistributedCacheEntryOptions distributedCacheEntryOptions )
317+ DistributedCacheEntryOptions ? distributedCacheEntryOptions )
264318 {
265- lock ( LockKeyLock )
319+ KeySemaphore . Wait ( ) ;
320+ try
266321 {
267322 return MemoryCache . GetOrCreate (
268323 $ "{ LockKeyPrefix } { key } ",
@@ -274,20 +329,25 @@ private object GetOrCreateLock(
274329 distributedCacheEntryOptions ? . AbsoluteExpirationRelativeToNow ;
275330 cacheEntry . SlidingExpiration =
276331 distributedCacheEntryOptions ? . SlidingExpiration ;
277- return new object ( ) ;
332+ return new SemaphoreSlim ( 1 , 1 ) ;
278333 } ) ??
279- new object ( ) ;
334+ new SemaphoreSlim ( 1 , 1 ) ;
335+ }
336+ finally
337+ {
338+ KeySemaphore . Release ( ) ;
280339 }
281340 }
282341
283- private Task < object > GetOrCreateLockAsync (
342+ private async Task < SemaphoreSlim > GetOrCreateLockAsync (
284343 string key ,
285- DistributedCacheEntryOptions distributedCacheEntryOptions ,
344+ DistributedCacheEntryOptions ? distributedCacheEntryOptions ,
286345 CancellationToken cancellationToken = default )
287346 {
288- lock ( LockKeyLock )
347+ await KeySemaphore . WaitAsync ( cancellationToken ) ;
348+ try
289349 {
290- return Task . FromResult ( MemoryCache . GetOrCreate (
350+ return await MemoryCache . GetOrCreateAsync (
291351 $ "{ LockKeyPrefix } { key } ",
292352 cacheEntry =>
293353 {
@@ -297,17 +357,20 @@ private Task<object> GetOrCreateLockAsync(
297357 distributedCacheEntryOptions ? . AbsoluteExpirationRelativeToNow ;
298358 cacheEntry . SlidingExpiration =
299359 distributedCacheEntryOptions ? . SlidingExpiration ;
300- return new object ( ) ;
360+ return Task . FromResult ( new SemaphoreSlim ( 1 , 1 ) ) ;
301361 } ) ??
302- new object ( ) ) ;
362+ new SemaphoreSlim ( 1 , 1 ) ;
363+ }
364+ finally
365+ {
366+ KeySemaphore . Release ( ) ;
303367 }
304368 }
305369
306- private object SetLock (
370+ private SemaphoreSlim SetLock (
307371 string key ,
308- object value ,
309- DistributedCacheEntryOptions distributedCacheEntryOptions ,
310- CancellationToken cancellationToken = default )
372+ SemaphoreSlim semaphore ,
373+ DistributedCacheEntryOptions distributedCacheEntryOptions )
311374 {
312375 var memoryCacheEntryOptions = new MemoryCacheEntryOptions
313376 {
@@ -321,7 +384,7 @@ private object SetLock(
321384
322385 return MemoryCache . Set (
323386 $ "{ LockKeyPrefix } { key } ",
324- value ,
387+ semaphore ,
325388 memoryCacheEntryOptions ) ;
326389 }
327390
0 commit comments