Skip to content

Commit 2760e08

Browse files
authored
Mitigate lock convoys when AtomicFactory throws (#477)
1 parent 47a547e commit 2760e08

File tree

2 files changed

+155
-11
lines changed

2 files changed

+155
-11
lines changed

BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11

2+
using System;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using BitFaster.Caching.Atomic;
@@ -118,7 +119,9 @@ public void WhenArgObjectValuesAreSameEqualsTrue()
118119
[Fact]
119120
public async Task WhenCallersRunConcurrentlyResultIsFromWinner()
120121
{
121-
var enter = new ManualResetEvent(false);
122+
var enter1 = new ManualResetEvent(false);
123+
var enter2 = new ManualResetEvent(false);
124+
var factory = new ManualResetEvent(false);
122125
var resume = new ManualResetEvent(false);
123126

124127
var atomicFactory = new AtomicFactory<int, int>();
@@ -127,9 +130,10 @@ public async Task WhenCallersRunConcurrentlyResultIsFromWinner()
127130

128131
Task<int> first = Task.Run(() =>
129132
{
133+
enter1.Set();
130134
return atomicFactory.GetValue(1, k =>
131135
{
132-
enter.Set();
136+
factory.Set();
133137
resume.WaitOne();
134138

135139
result = 1;
@@ -140,9 +144,10 @@ public async Task WhenCallersRunConcurrentlyResultIsFromWinner()
140144

141145
Task<int> second = Task.Run(() =>
142146
{
147+
enter2.Set();
143148
return atomicFactory.GetValue(1, k =>
144149
{
145-
enter.Set();
150+
factory.Set();
146151
resume.WaitOne();
147152

148153
result = 2;
@@ -151,13 +156,114 @@ public async Task WhenCallersRunConcurrentlyResultIsFromWinner()
151156
});
152157
});
153158

154-
enter.WaitOne();
159+
enter1.WaitOne();
160+
enter2.WaitOne();
161+
factory.WaitOne();
155162
resume.Set();
156163

157164
(await first).Should().Be(result);
158165
(await second).Should().Be(result);
159166

160167
winnerCount.Should().Be(1);
161168
}
169+
170+
[Fact]
171+
public async Task WhenCallersRunConcurrentlyAndFailExceptionIsPropogated()
172+
{
173+
var enter1 = new ManualResetEvent(false);
174+
var enter2 = new ManualResetEvent(false);
175+
var factory = new ManualResetEvent(false);
176+
var resume = new ManualResetEvent(false);
177+
178+
var atomicFactory = new AtomicFactory<int, int>();
179+
var throwCount = 0;
180+
181+
Task<int> first = Task.Run(() =>
182+
{
183+
enter1.Set();
184+
return atomicFactory.GetValue(1, k =>
185+
{
186+
factory.Set();
187+
resume.WaitOne();
188+
189+
Interlocked.Increment(ref throwCount);
190+
throw new Exception();
191+
});
192+
});
193+
194+
Task<int> second = Task.Run(() =>
195+
{
196+
enter2.Set();
197+
return atomicFactory.GetValue(1, k =>
198+
{
199+
factory.Set();
200+
resume.WaitOne();
201+
202+
Interlocked.Increment(ref throwCount);
203+
throw new Exception();
204+
});
205+
});
206+
207+
enter1.WaitOne();
208+
enter2.WaitOne();
209+
factory.WaitOne();
210+
resume.Set();
211+
212+
Func<Task> act1 = () => first;
213+
Func<Task> act2 = () => second;
214+
215+
await act1.Should().ThrowAsync<Exception>();
216+
await act2.Should().ThrowAsync<Exception>();
217+
218+
// verify only one exception was thrown
219+
throwCount.Should().Be(1);
220+
}
221+
222+
[Fact]
223+
public async Task WhenCallersRunConcurrentlyAndFailNewCallerStartsClean()
224+
{
225+
var enter1 = new ManualResetEvent(false);
226+
var enter2 = new ManualResetEvent(false);
227+
var factory = new ManualResetEvent(false);
228+
var resume = new ManualResetEvent(false);
229+
230+
var atomicFactory = new AtomicFactory<int, int>();
231+
232+
Task<int> first = Task.Run(() =>
233+
{
234+
enter1.Set();
235+
return atomicFactory.GetValue(1, k =>
236+
{
237+
factory.Set();
238+
resume.WaitOne();
239+
throw new Exception();
240+
});
241+
});
242+
243+
Task<int> second = Task.Run(() =>
244+
{
245+
enter2.Set();
246+
return atomicFactory.GetValue(1, k =>
247+
{
248+
factory.Set();
249+
resume.WaitOne();
250+
throw new Exception();
251+
});
252+
});
253+
254+
enter1.WaitOne();
255+
enter2.WaitOne();
256+
factory.WaitOne();
257+
resume.Set();
258+
259+
Func<Task> act1 = () => first;
260+
Func<Task> act2 = () => second;
261+
262+
await act1.Should().ThrowAsync<Exception>();
263+
await act2.Should().ThrowAsync<Exception>();
264+
265+
// verify exception is no longer cached
266+
atomicFactory.GetValue(1, k => k).Should().Be(1);
267+
}
162268
}
163269
}

BitFaster.Caching/Atomic/AtomicFactory.cs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics;
4+
using System.Runtime.ExceptionServices;
45
using System.Threading;
56

67
namespace BitFaster.Caching.Atomic
78
{
89
/// <summary>
9-
/// A class that provides simple, lightweight exactly once initialization for values
10-
/// stored in a cache.
10+
/// A class that provides simple, lightweight exactly once initialization for values stored
11+
/// in a cache. Exceptions are propogated to the caller.
1112
/// </summary>
1213
/// <typeparam name="K">The type of the key.</typeparam>
1314
/// <typeparam name="V">The type of the value.</typeparam>
@@ -92,14 +93,35 @@ public V ValueIfCreated
9293
}
9394
}
9495

96+
/// <summary>
97+
/// Note the failure case works like this:
98+
/// 1. Thread A enters AtomicFactory.CreateValue then Initializer.CreateValue and holds the lock.
99+
/// 2. Thread B enters AtomicFactory.CreateValue then Initializer.CreateValue and queues on the lock.
100+
/// 3. Thread A calls value factory, and after 1 second throws an exception. The exception is
101+
/// captured in exceptionDispatch, lock is released, and an exeption is thrown.
102+
/// 4. AtomicFactory.CreateValue catches the exception and creates a fresh initializer.
103+
/// 5. Thread B enters the lock, finds exceptionDispatch is populated and immediately throws.
104+
/// 6. Thread C can now start from a clean state.
105+
/// This mitigates lock convoys where many queued threads will fail slowly one by one, introducing delays
106+
/// and multiplying the number of calls to the failing resource.
107+
/// </summary>
95108
private V CreateValue<TFactory>(K key, TFactory valueFactory) where TFactory : struct, IValueFactory<K, V>
96109
{
97110
var init = Volatile.Read(ref initializer);
98111

99112
if (init != null)
100113
{
101-
value = init.CreateValue(key, valueFactory);
102-
Volatile.Write(ref initializer, null); // volatile write must occur after setting value
114+
try
115+
{
116+
value = init.CreateValue(key, valueFactory);
117+
Volatile.Write(ref initializer, null); // volatile write must occur after setting value
118+
}
119+
catch
120+
{
121+
// Overwrite the initializer with a fresh copy. New threads will start from a clean state.
122+
Volatile.Write(ref initializer, new Initializer());
123+
throw;
124+
}
103125
}
104126

105127
return value;
@@ -138,6 +160,7 @@ private class Initializer
138160
{
139161
private bool isInitialized;
140162
private V value;
163+
private ExceptionDispatchInfo exceptionDispatch;
141164

142165
public V CreateValue<TFactory>(K key, TFactory valueFactory) where TFactory : struct, IValueFactory<K, V>
143166
{
@@ -148,9 +171,24 @@ public V CreateValue<TFactory>(K key, TFactory valueFactory) where TFactory : st
148171
return value;
149172
}
150173

151-
value = valueFactory.Create(key);
152-
isInitialized = true;
153-
return value;
174+
// If a previous thread called the factory and failed, throw the same error instead
175+
// of calling the factory again.
176+
if (exceptionDispatch != null)
177+
{
178+
exceptionDispatch.Throw();
179+
}
180+
181+
try
182+
{
183+
value = valueFactory.Create(key);
184+
isInitialized = true;
185+
return value;
186+
}
187+
catch (Exception ex)
188+
{
189+
exceptionDispatch = ExceptionDispatchInfo.Capture(ex);
190+
throw;
191+
}
154192
}
155193
}
156194
}

0 commit comments

Comments
 (0)