Skip to content

Commit e3df77f

Browse files
Jerrycnblogs-dudu
authored andcommitted
fix: fix GetValueAsync concurrent issue
1 parent bfdd133 commit e3df77f

File tree

2 files changed

+219
-22
lines changed

2 files changed

+219
-22
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ packages/*/
1010
project.lock.json
1111
/publish/*.bat
1212
.vs/
13+
.idea

Enyim.Caching/Memcached/MemcachedNode.cs

Lines changed: 218 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class MemcachedNode : IMemcachedNode
3131
private readonly ISocketPoolConfiguration config;
3232
private InternalPoolImpl internalPoolImpl;
3333
private bool isInitialized;
34+
private SemaphoreSlim poolInitSemaphore = new SemaphoreSlim(1, 1);
3435

3536
public MemcachedNode(
3637
EndPoint endpoint,
@@ -96,7 +97,9 @@ public bool Ping()
9697
if (this.isDisposed) return false;
9798

9899
// try to connect to the server
99-
using (var socket = this.CreateSocket()) ;
100+
using (var socket = this.CreateSocket())
101+
{
102+
}
100103

101104
if (this.internalPoolImpl.IsAlive)
102105
return true;
@@ -117,23 +120,25 @@ public bool Ping()
117120
//could not reconnect
118121
catch { return false; }
119122
}
120-
123+
121124
/// <summary>
122125
/// Acquires a new item from the pool
123126
/// </summary>
124127
/// <returns>An <see cref="T:PooledSocket"/> instance which is connected to the memcached server, or <value>null</value> if the pool is dead.</returns>
125128
public IPooledSocketResult Acquire()
126129
{
127130
if (!this.isInitialized)
128-
lock (this.internalPoolImpl)
129-
if (!this.isInitialized)
130-
{
131-
var startTime = DateTime.Now;
132-
this.internalPoolImpl.InitPool();
133-
this.isInitialized = true;
134-
_logger.LogInformation("MemcachedInitPool-cost: {0}ms", (DateTime.Now - startTime).TotalMilliseconds);
135-
}
136-
131+
{
132+
poolInitSemaphore.Wait();
133+
if (!this.isInitialized)
134+
{
135+
var startTime = DateTime.Now;
136+
this.internalPoolImpl.InitPool();
137+
this.isInitialized = true;
138+
_logger.LogInformation("MemcachedInitPool-cost: {0}ms", (DateTime.Now - startTime).TotalMilliseconds);
139+
poolInitSemaphore.Release();
140+
}
141+
}
137142
try
138143
{
139144
return this.internalPoolImpl.Acquire();
@@ -148,6 +153,38 @@ public IPooledSocketResult Acquire()
148153
}
149154
}
150155

156+
/// <summary>
157+
/// Acquires a new item from the pool
158+
/// </summary>
159+
/// <returns>An <see cref="T:PooledSocket"/> instance which is connected to the memcached server, or <value>null</value> if the pool is dead.</returns>
160+
public async Task<IPooledSocketResult> AcquireAsync()
161+
{
162+
if (!this.isInitialized)
163+
{
164+
await poolInitSemaphore.WaitAsync();
165+
if (!this.isInitialized)
166+
{
167+
var startTime = DateTime.Now;
168+
await this.internalPoolImpl.InitPoolAsync();
169+
this.isInitialized = true;
170+
_logger.LogInformation("MemcachedInitPool-cost: {0}ms", (DateTime.Now - startTime).TotalMilliseconds);
171+
poolInitSemaphore.Release();
172+
}
173+
}
174+
try
175+
{
176+
return await this.internalPoolImpl.AcquireAsync();
177+
}
178+
catch (Exception e)
179+
{
180+
var message = "Acquire failed. Maybe we're already disposed?";
181+
_logger.LogError(message, e);
182+
var result = new PooledSocketResult();
183+
result.Fail(message, e);
184+
return result;
185+
}
186+
}
187+
151188
~MemcachedNode()
152189
{
153190
try { ((IDisposable)this).Dispose(); }
@@ -203,7 +240,7 @@ private class InternalPoolImpl : IDisposable
203240
private MemcachedNode ownerNode;
204241
private readonly EndPoint _endPoint;
205242
private readonly TimeSpan queueTimeout;
206-
private Semaphore semaphore;
243+
private SemaphoreSlim semaphore;
207244

208245
private readonly object initLock = new Object();
209246

@@ -227,13 +264,13 @@ internal InternalPoolImpl(
227264
this.minItems = config.MinPoolSize;
228265
this.maxItems = config.MaxPoolSize;
229266

230-
this.semaphore = new Semaphore(maxItems, maxItems);
267+
this.semaphore = new SemaphoreSlim(maxItems, maxItems);
231268
this.freeItems = new InterlockedStack<PooledSocket>();
232269

233270
_logger = logger;
234271
_isDebugEnabled = _logger.IsEnabled(LogLevel.Debug);
235272
}
236-
273+
237274
internal void InitPool()
238275
{
239276
try
@@ -262,6 +299,41 @@ internal void InitPool()
262299
}
263300
}
264301

302+
internal async Task InitPoolAsync()
303+
{
304+
try
305+
{
306+
if (this.minItems > 0)
307+
{
308+
for (int i = 0; i < this.minItems; i++)
309+
{
310+
this.freeItems.Push(await this.CreateSocketAsync());
311+
312+
// cannot connect to the server
313+
if (!this.isAlive)
314+
break;
315+
}
316+
}
317+
318+
if (_logger.IsEnabled(LogLevel.Debug))
319+
_logger.LogDebug("Pool has been inited for {0} with {1} sockets", this.endPoint, this.minItems);
320+
321+
}
322+
catch (Exception e)
323+
{
324+
_logger.LogError("Could not init pool.", new EventId(0), e);
325+
326+
this.MarkAsDead();
327+
}
328+
}
329+
330+
private async Task<PooledSocket> CreateSocketAsync()
331+
{
332+
var ps = await this.ownerNode.CreateSocketAsync();
333+
ps.CleanupCallback = this.ReleaseSocket;
334+
335+
return ps;
336+
}
265337
private PooledSocket CreateSocket()
266338
{
267339
var ps = this.ownerNode.CreateSocket();
@@ -279,7 +351,7 @@ public DateTime MarkedAsDeadUtc
279351
{
280352
get { return this.markedAsDeadUtc; }
281353
}
282-
354+
283355
/// <summary>
284356
/// Acquires a new item from the pool
285357
/// </summary>
@@ -303,7 +375,7 @@ public IPooledSocketResult Acquire()
303375

304376
PooledSocket retval = null;
305377

306-
if (!this.semaphore.WaitOne(this.queueTimeout))
378+
if (!this.semaphore.Wait(this.queueTimeout))
307379
{
308380
message = "Pool is full, timeouting. " + _endPoint;
309381
if (_isDebugEnabled) _logger.LogDebug(message);
@@ -387,6 +459,113 @@ public IPooledSocketResult Acquire()
387459
return result;
388460
}
389461

462+
/// <summary>
463+
/// Acquires a new item from the pool
464+
/// </summary>
465+
/// <returns>An <see cref="T:PooledSocket"/> instance which is connected to the memcached server, or <value>null</value> if the pool is dead.</returns>
466+
public async Task<IPooledSocketResult> AcquireAsync()
467+
{
468+
var result = new PooledSocketResult();
469+
var message = string.Empty;
470+
471+
if (_isDebugEnabled) _logger.LogDebug("Acquiring stream from pool. " + this.endPoint);
472+
473+
if (!this.isAlive || this.isDisposed)
474+
{
475+
message = "Pool is dead or disposed, returning null. " + this.endPoint;
476+
result.Fail(message);
477+
478+
if (_isDebugEnabled) _logger.LogDebug(message);
479+
480+
return result;
481+
}
482+
483+
PooledSocket retval = null;
484+
485+
if (!await this.semaphore.WaitAsync(this.queueTimeout))
486+
{
487+
message = "Pool is full, timeouting. " + this.endPoint;
488+
if (_isDebugEnabled) _logger.LogDebug(message);
489+
result.Fail(message, new TimeoutException());
490+
491+
// everyone is so busy
492+
return result;
493+
}
494+
495+
// maybe we died while waiting
496+
if (!this.isAlive)
497+
{
498+
message = "Pool is dead, returning null. " + this.endPoint;
499+
if (_isDebugEnabled) _logger.LogDebug(message);
500+
result.Fail(message);
501+
502+
return result;
503+
}
504+
505+
// do we have free items?
506+
if (this.freeItems.TryPop(out retval))
507+
{
508+
#region [ get it from the pool ]
509+
510+
try
511+
{
512+
retval.Reset();
513+
514+
message = "Socket was reset. " + retval.InstanceId;
515+
if (_isDebugEnabled) _logger.LogDebug(message);
516+
517+
result.Pass(message);
518+
result.Value = retval;
519+
return result;
520+
}
521+
catch (Exception e)
522+
{
523+
message = "Failed to reset an acquired socket.";
524+
_logger.LogError(message, e);
525+
526+
this.MarkAsDead();
527+
result.Fail(message, e);
528+
return result;
529+
}
530+
531+
#endregion
532+
}
533+
534+
// free item pool is empty
535+
message = "Could not get a socket from the pool, Creating a new item. " + this.endPoint;
536+
if (_isDebugEnabled) _logger.LogDebug(message);
537+
538+
539+
try
540+
{
541+
// okay, create the new item
542+
var startTime = DateTime.Now;
543+
retval = await this.CreateSocketAsync();
544+
_logger.LogInformation("MemcachedAcquire-CreateSocket: {0}ms", (DateTime.Now - startTime).TotalMilliseconds);
545+
result.Value = retval;
546+
result.Pass();
547+
}
548+
catch (Exception e)
549+
{
550+
message = "Failed to create socket. " + this.endPoint;
551+
_logger.LogError(message, e);
552+
553+
// eventhough this item failed the failure policy may keep the pool alive
554+
// so we need to make sure to release the semaphore, so new connections can be
555+
// acquired or created (otherwise dead conenctions would "fill up" the pool
556+
// while the FP pretends that the pool is healthy)
557+
semaphore.Release();
558+
559+
this.MarkAsDead();
560+
result.Fail(message);
561+
return result;
562+
}
563+
564+
if (_isDebugEnabled) _logger.LogDebug("Done.");
565+
566+
return result;
567+
}
568+
390569
private void MarkAsDead()
391570
{
392571
if (_isDebugEnabled) _logger.LogDebug("Mark as dead was requested for {0}", _endPoint);
@@ -517,13 +696,30 @@ protected internal virtual PooledSocket CreateSocket()
517696
{
518697
try
519698
{
520-
return new PooledSocket(this.endPoint, this.config.ConnectionTimeout, this.config.ReceiveTimeout, _logger);
699+
var ps = new PooledSocket(this.endPoint, this.config.ConnectionTimeout, this.config.ReceiveTimeout, _logger);
700+
ps.Connect();
701+
return ps;
521702
}
522703
catch (Exception ex)
523704
{
524705
_logger.LogError(ex, $"Create {nameof(PooledSocket)}");
525706
throw;
526707
}
708+
709+
}
710+
protected internal virtual async Task<PooledSocket> CreateSocketAsync()
711+
{
712+
try
713+
{
714+
var ps = new PooledSocket(this.endPoint, this.config.ConnectionTimeout, this.config.ReceiveTimeout, _logger);
715+
await ps.ConnectAsync();
716+
return ps;
717+
}
718+
catch (Exception ex)
719+
{
720+
_logger.LogError(new EventId(this.GetHashCode(), nameof(MemcachedNode)), ex, $"Create {nameof(PooledSocket)}");
721+
throw;
722+
}
527723
}
528724

529725
//protected internal virtual PooledSocket CreateSocket(IPEndPoint endpoint, TimeSpan connectionTimeout, TimeSpan receiveTimeout)
@@ -581,11 +777,11 @@ protected virtual IPooledSocketResult ExecuteOperation(IOperation op)
581777

582778
}
583779

584-
protected async virtual Task<IPooledSocketResult> ExecuteOperationAsync(IOperation op)
780+
protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperation op)
585781
{
586782
_logger.LogDebug($"ExecuteOperationAsync({op})");
587783

588-
var result = this.Acquire();
784+
var result = await this.AcquireAsync();
589785
if (result.Success && result.HasValue)
590786
{
591787
try
@@ -630,9 +826,9 @@ protected async virtual Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
630826

631827
}
632828

633-
protected virtual bool ExecuteOperationAsync(IOperation op, Action<bool> next)
829+
protected virtual async Task<bool> ExecuteOperationAsync(IOperation op, Action<bool> next)
634830
{
635-
var socket = this.Acquire().Value;
831+
var socket = (await this.AcquireAsync()).Value;
636832
if (socket == null) return false;
637833

638834
//key(string) to buffer(btye[])
@@ -698,7 +894,7 @@ async Task<IOperationResult> IMemcachedNode.ExecuteAsync(IOperation op)
698894

699895
bool IMemcachedNode.ExecuteAsync(IOperation op, Action<bool> next)
700896
{
701-
return this.ExecuteOperationAsync(op, next);
897+
return this.ExecuteOperationAsync(op, next).Result;
702898
}
703899

704900
event Action<IMemcachedNode> IMemcachedNode.Failed

0 commit comments

Comments
 (0)