Skip to content

Commit 1ec0ef2

Browse files
authored
lock contention (#231)
* lock contention * fix tests * comment * test * foreground * bkgnd * thread test * 1buffer * null * fix * test * afterwrite * fix tests * relax * rem comment
1 parent 35a214f commit 1ec0ef2

File tree

3 files changed

+101
-16
lines changed

3 files changed

+101
-16
lines changed

BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,71 @@ public void VerifyHitsWithForegroundScheduler()
740740
VerifyHits(iterations: iterations + dropped, minSamples: iterations);
741741
}
742742

743+
[Fact]
744+
public void VerifyMisses()
745+
{
746+
cache = new ConcurrentLfu<int, int>(1, 20, new BackgroundThreadScheduler(), EqualityComparer<int>.Default,
747+
new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1)));
748+
749+
int iterations = 100000;
750+
Func<int, int> func = x => x;
751+
752+
var start = Stopwatch.GetTimestamp();
753+
754+
for (int i = 0; i < iterations; i++)
755+
{
756+
cache.GetOrAdd(i, func);
757+
}
758+
759+
var end = Stopwatch.GetTimestamp();
760+
761+
cache.PendingMaintenance();
762+
763+
var totalTicks = end - start;
764+
var timeMs = ((double)totalTicks / Stopwatch.Frequency) * 1000.0;
765+
var timeNs = timeMs / 1000000;
766+
767+
var timePerOp = timeMs / (double)iterations;
768+
var samplePercent = this.cache.Metrics.Value.Misses / (double)iterations * 100;
769+
770+
this.output.WriteLine($"Elapsed {timeMs}ms - {timeNs}ns/op");
771+
this.output.WriteLine($"Cache misses {this.cache.Metrics.Value.Misses} (sampled {samplePercent}%)");
772+
this.output.WriteLine($"Maintenance ops {this.cache.Scheduler.RunCount}");
773+
774+
cache.Metrics.Value.Misses.Should().Be(iterations);
775+
}
776+
777+
[Fact]
778+
public async Task ThreadedVerifyMisses()
779+
{
780+
// buffer size is 1, this will cause dropped writes on some threads where the buffer is full
781+
cache = new ConcurrentLfu<int, int>(1, 20, new NullScheduler(), EqualityComparer<int>.Default,
782+
new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1)));
783+
784+
int threads = 4;
785+
int iterations = 100000;
786+
787+
await Threaded.Run(threads, i =>
788+
{
789+
Func<int, int> func = x => x;
790+
791+
int start = i * iterations;
792+
793+
for (int j = start; j < start + iterations; j++)
794+
{
795+
cache.GetOrAdd(j, func);
796+
}
797+
});
798+
799+
var samplePercent = this.cache.Metrics.Value.Misses / (double)iterations / threads * 100;
800+
801+
this.output.WriteLine($"Cache misses {this.cache.Metrics.Value.Misses} (sampled {samplePercent}%)");
802+
this.output.WriteLine($"Maintenance ops {this.cache.Scheduler.RunCount}");
803+
804+
cache.Metrics.Value.Misses.Should().Be(iterations * threads);
805+
cache.Count.Should().BeCloseTo(20, 1);
806+
}
807+
743808
private void VerifyHits(int iterations, int minSamples)
744809
{
745810
Func<int, int> func = x => x;

BitFaster.Caching.UnitTests/Threaded.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,23 @@ namespace BitFaster.Caching.UnitTests
99
{
1010
public class Threaded
1111
{
12-
public static async Task Run(int threadCount, Action action)
12+
public static Task Run(int threadCount, Action action)
13+
{
14+
return Run(threadCount, i => action());
15+
}
16+
17+
public static async Task Run(int threadCount, Action<int> action)
1318
{
1419
var tasks = new Task[threadCount];
1520
ManualResetEvent mre = new ManualResetEvent(false);
1621

1722
for (int i = 0; i < threadCount; i++)
1823
{
24+
int run = i;
1925
tasks[i] = Task.Run(() =>
2026
{
2127
mre.WaitOne();
22-
action();
28+
action(run);
2329
});
2430
}
2531

BitFaster.Caching/Lfu/ConcurrentLfu.cs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace BitFaster.Caching.Lfu
3434
[DebuggerDisplay("Count = {Count}/{Capacity}")]
3535
public sealed class ConcurrentLfu<K, V> : ICache<K, V>, IAsyncCache<K, V>, IBoundedPolicy
3636
{
37-
private const int MaxWriteBufferRetries = 16;
37+
private const int MaxWriteBufferRetries = 64;
3838

3939
private readonly ConcurrentDictionary<K, LfuNode<K, V>> dictionary;
4040

@@ -308,8 +308,6 @@ private static void TakeCandidatesInLruOrder(LfuNodeList<K, V> lru, List<LfuNode
308308

309309
private void AfterWrite(LfuNode<K, V> node)
310310
{
311-
var spinner = new SpinWait();
312-
313311
for (int i = 0; i < MaxWriteBufferRetries; i++)
314312
{
315313
if (writeBuffer.TryAdd(node) == BufferStatus.Success)
@@ -319,19 +317,31 @@ private void AfterWrite(LfuNode<K, V> node)
319317
}
320318

321319
TryScheduleDrain();
322-
323-
spinner.SpinOnce();
324320
}
325321

326322
lock (this.maintenanceLock)
327323
{
324+
// aggressively try to exit the lock early before doing full maintenance
325+
var status = BufferStatus.Contended;
326+
while (status != BufferStatus.Full)
327+
{
328+
status = writeBuffer.TryAdd(node);
329+
330+
if (status == BufferStatus.Success)
331+
{
332+
ScheduleAfterWrite();
333+
return;
334+
}
335+
}
336+
328337
// if the write was dropped from the buffer, explicitly pass it to maintenance
329338
Maintenance(node);
330339
}
331340
}
332341

333342
private void ScheduleAfterWrite()
334343
{
344+
var spinner = new SpinWait();
335345
while (true)
336346
{
337347
switch (this.drainStatus.Status())
@@ -352,6 +362,7 @@ private void ScheduleAfterWrite()
352362
case DrainStatus.ProcessingToRequired:
353363
return;
354364
}
365+
spinner.SpinOnce();
355366
}
356367
}
357368

@@ -424,29 +435,32 @@ private bool Maintenance(LfuNode<K, V> droppedWrite = null)
424435
var localDrainBuffer = RentDrainBuffer();
425436

426437
// extract to a buffer before doing book keeping work, ~2x faster
427-
var count = readBuffer.DrainTo(localDrainBuffer);
438+
int readCount = readBuffer.DrainTo(localDrainBuffer);
428439

429-
for (int i = 0; i < count; i++)
440+
for (int i = 0; i < readCount; i++)
430441
{
431442
this.cmSketch.Increment(localDrainBuffer[i].Key);
432443
}
433444

434-
for (int i = 0; i < count; i++)
445+
for (int i = 0; i < readCount; i++)
435446
{
436447
OnAccess(localDrainBuffer[i]);
437448
}
438449

439-
var wasDrained = count == 0;
440-
count = this.writeBuffer.DrainTo(localDrainBuffer);
450+
int writeCount = this.writeBuffer.DrainTo(localDrainBuffer);
441451

442-
for (int i = 0; i < count; i++)
452+
for (int i = 0; i < writeCount; i++)
443453
{
444454
OnWrite(localDrainBuffer[i]);
445455
}
446456

457+
// we are done only when both buffers are empty
458+
var done = readCount == 0 & writeCount == 0;
459+
447460
if (droppedWrite != null)
448461
{
449462
OnWrite(droppedWrite);
463+
done = true;
450464
}
451465

452466
ReturnDrainBuffer(localDrainBuffer);
@@ -458,14 +472,14 @@ private bool Maintenance(LfuNode<K, V> droppedWrite = null)
458472
// Reset to idle if either
459473
// 1. We drained both input buffers (all work done)
460474
// 2. or scheduler is foreground (since don't run continuously on the foreground)
461-
if ((wasDrained || !scheduler.IsBackground) &&
475+
if ((done || !scheduler.IsBackground) &&
462476
(this.drainStatus.Status() != DrainStatus.ProcessingToIdle ||
463477
!this.drainStatus.Cas(DrainStatus.ProcessingToIdle, DrainStatus.Idle)))
464478
{
465479
this.drainStatus.Set(DrainStatus.Required);
466480
}
467481

468-
return wasDrained;
482+
return done;
469483
}
470484

471485
private void OnAccess(LfuNode<K, V> node)
@@ -595,7 +609,7 @@ private void EvictFromMain(LfuNode<K, V> candidate)
595609
while (this.windowLru.Count + this.probationLru.Count + this.protectedLru.Count > this.Capacity)
596610
{
597611
// bail when we run out of options
598-
if (candidate == null || victim == null || victim == candidate)
612+
if (candidate == null | victim == null | victim == candidate)
599613
{
600614
break;
601615
}

0 commit comments

Comments
 (0)