77using System . Threading . Tasks ;
88using Xunit ;
99using Xunit . Abstractions ;
10-
10+ using System . Collections . Concurrent ;
11+ using System . Reflection ;
12+ using System . Runtime . CompilerServices ;
13+
1114namespace BitFaster . Caching . UnitTests . Lru
1215{
1316 public class ConcurrentLruTests
@@ -1088,19 +1091,149 @@ public void WhenItemsAreTrimmedAnEventIsFired()
10881091 }
10891092
10901093 [ Fact ]
1091- public async Task WhenItemsAreScannedInParallelCapacityIsNotExceeded ( )
1094+ public async Task WhenSoakConcurrentGetCacheEndsInConsistentState ( )
10921095 {
1093- await Threaded . Run ( 4 , ( ) => {
1094- for ( int i = 0 ; i < 100000 ; i ++ )
1095- {
1096- lru . GetOrAdd ( i + 1 , i => i . ToString ( ) ) ;
1097- }
1098- } ) ;
1096+ for ( int i = 0 ; i < 10 ; i ++ )
1097+ {
1098+ await Threaded . Run ( 4 , ( ) => {
1099+ for ( int i = 0 ; i < 100000 ; i ++ )
1100+ {
1101+ lru . GetOrAdd ( i + 1 , i => i . ToString ( ) ) ;
1102+ }
1103+ } ) ;
10991104
1100- this . testOutputHelper . WriteLine ( $ "{ lru . HotCount } { lru . WarmCount } { lru . ColdCount } ") ;
1105+ this . testOutputHelper . WriteLine ( $ "{ lru . HotCount } { lru . WarmCount } { lru . ColdCount } ") ;
1106+ this . testOutputHelper . WriteLine ( string . Join ( " " , lru . Keys ) ) ;
11011107
1102- // allow +/- 1 variance for capacity
1103- lru . Count . Should ( ) . BeCloseTo ( 9 , 1 ) ;
1108+ // allow +/- 1 variance for capacity
1109+ lru . Count . Should ( ) . BeCloseTo ( 9 , 1 ) ;
1110+ RunIntegrityCheck ( ) ;
1111+ }
1112+ }
1113+
1114+ [ Fact ]
1115+ public async Task WhenSoakConcurrentGetAsyncCacheEndsInConsistentState ( )
1116+ {
1117+ for ( int i = 0 ; i < 10 ; i ++ )
1118+ {
1119+ await Threaded . RunAsync ( 4 , async ( ) => {
1120+ for ( int i = 0 ; i < 100000 ; i ++ )
1121+ {
1122+ await lru . GetOrAddAsync ( i + 1 , i => Task . FromResult ( i . ToString ( ) ) ) ;
1123+ }
1124+ } ) ;
1125+
1126+ this . testOutputHelper . WriteLine ( $ "{ lru . HotCount } { lru . WarmCount } { lru . ColdCount } ") ;
1127+ this . testOutputHelper . WriteLine ( string . Join ( " " , lru . Keys ) ) ;
1128+
1129+ // allow +/- 1 variance for capacity
1130+ lru . Count . Should ( ) . BeCloseTo ( 9 , 1 ) ;
1131+ RunIntegrityCheck ( ) ;
1132+ }
1133+ }
1134+
1135+ [ Fact ]
1136+ public async Task WhenSoakConcurrentGetWithArgCacheEndsInConsistentState ( )
1137+ {
1138+ for ( int i = 0 ; i < 10 ; i ++ )
1139+ {
1140+ await Threaded . Run ( 4 , ( ) => {
1141+ for ( int i = 0 ; i < 100000 ; i ++ )
1142+ {
1143+ // use the arg overload
1144+ lru . GetOrAdd ( i + 1 , ( i , s ) => i . ToString ( ) , "Foo" ) ;
1145+ }
1146+ } ) ;
1147+
1148+ this . testOutputHelper . WriteLine ( $ "{ lru . HotCount } { lru . WarmCount } { lru . ColdCount } ") ;
1149+ this . testOutputHelper . WriteLine ( string . Join ( " " , lru . Keys ) ) ;
1150+
1151+ // allow +/- 1 variance for capacity
1152+ lru . Count . Should ( ) . BeCloseTo ( 9 , 1 ) ;
1153+ RunIntegrityCheck ( ) ;
1154+ }
1155+ }
1156+
1157+ [ Fact ]
1158+ public async Task WhenSoakConcurrentGetAsyncWithArgCacheEndsInConsistentState ( )
1159+ {
1160+ for ( int i = 0 ; i < 10 ; i ++ )
1161+ {
1162+ await Threaded . RunAsync ( 4 , async ( ) => {
1163+ for ( int i = 0 ; i < 100000 ; i ++ )
1164+ {
1165+ // use the arg overload
1166+ await lru . GetOrAddAsync ( i + 1 , ( i , s ) => Task . FromResult ( i . ToString ( ) ) , "Foo" ) ;
1167+ }
1168+ } ) ;
1169+
1170+ this . testOutputHelper . WriteLine ( $ "{ lru . HotCount } { lru . WarmCount } { lru . ColdCount } ") ;
1171+ this . testOutputHelper . WriteLine ( string . Join ( " " , lru . Keys ) ) ;
1172+
1173+ // allow +/- 1 variance for capacity
1174+ lru . Count . Should ( ) . BeCloseTo ( 9 , 1 ) ;
1175+ RunIntegrityCheck ( ) ;
1176+ }
1177+ }
1178+
1179+ [ Fact ]
1180+ public async Task WhenSoakConcurrentGetAndRemoveCacheEndsInConsistentState ( )
1181+ {
1182+ for ( int i = 0 ; i < 10 ; i ++ )
1183+ {
1184+ await Threaded . Run ( 4 , ( ) => {
1185+ for ( int i = 0 ; i < 100000 ; i ++ )
1186+ {
1187+ lru . TryRemove ( i + 1 ) ;
1188+ lru . GetOrAdd ( i + 1 , i => i . ToString ( ) ) ;
1189+ }
1190+ } ) ;
1191+
1192+ this . testOutputHelper . WriteLine ( $ "{ lru . HotCount } { lru . WarmCount } { lru . ColdCount } ") ;
1193+ this . testOutputHelper . WriteLine ( string . Join ( " " , lru . Keys ) ) ;
1194+
1195+ RunIntegrityCheck ( ) ;
1196+ }
1197+ }
1198+
1199+ [ Fact ]
1200+ public async Task WhenSoakConcurrentGetAndUpdateCacheEndsInConsistentState ( )
1201+ {
1202+ for ( int i = 0 ; i < 10 ; i ++ )
1203+ {
1204+ await Threaded . Run ( 4 , ( ) => {
1205+ for ( int i = 0 ; i < 100000 ; i ++ )
1206+ {
1207+ lru . TryUpdate ( i + 1 , i . ToString ( ) ) ;
1208+ lru . GetOrAdd ( i + 1 , i => i . ToString ( ) ) ;
1209+ }
1210+ } ) ;
1211+
1212+ this . testOutputHelper . WriteLine ( $ "{ lru . HotCount } { lru . WarmCount } { lru . ColdCount } ") ;
1213+ this . testOutputHelper . WriteLine ( string . Join ( " " , lru . Keys ) ) ;
1214+
1215+ RunIntegrityCheck ( ) ;
1216+ }
1217+ }
1218+
1219+ [ Fact ]
1220+ public async Task WhenSoakConcurrentGetAndAddCacheEndsInConsistentState ( )
1221+ {
1222+ for ( int i = 0 ; i < 10 ; i ++ )
1223+ {
1224+ await Threaded . Run ( 4 , ( ) => {
1225+ for ( int i = 0 ; i < 100000 ; i ++ )
1226+ {
1227+ lru . AddOrUpdate ( i + 1 , i . ToString ( ) ) ;
1228+ lru . GetOrAdd ( i + 1 , i => i . ToString ( ) ) ;
1229+ }
1230+ } ) ;
1231+
1232+ this . testOutputHelper . WriteLine ( $ "{ lru . HotCount } { lru . WarmCount } { lru . ColdCount } ") ;
1233+ this . testOutputHelper . WriteLine ( string . Join ( " " , lru . Keys ) ) ;
1234+
1235+ RunIntegrityCheck ( ) ;
1236+ }
11041237 }
11051238
11061239 private void Warmup ( )
@@ -1115,5 +1248,75 @@ private void Warmup()
11151248 lru . GetOrAdd ( - 8 , valueFactory . Create ) ;
11161249 lru . GetOrAdd ( - 9 , valueFactory . Create ) ;
11171250 }
1251+
1252+ private void RunIntegrityCheck ( )
1253+ {
1254+ new ConcurrentLruIntegrityChecker < int , string , LruItem < int , string > , LruPolicy < int , string > , TelemetryPolicy < int , string > > ( this . lru ) . Validate ( ) ;
1255+ }
1256+ }
1257+
1258+ public class ConcurrentLruIntegrityChecker < K , V , I , P , T >
1259+ where I : LruItem < K , V >
1260+ where P : struct , IItemPolicy < K , V , I >
1261+ where T : struct , ITelemetryPolicy < K , V >
1262+ {
1263+ private readonly ConcurrentLruCore < K , V , I , P , T > cache ;
1264+
1265+ private readonly ConcurrentQueue < I > hotQueue ;
1266+ private readonly ConcurrentQueue < I > warmQueue ;
1267+ private readonly ConcurrentQueue < I > coldQueue ;
1268+
1269+ private static FieldInfo hotQueueField = typeof ( ConcurrentLruCore < K , V , I , P , T > ) . GetField ( "hotQueue" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
1270+ private static FieldInfo warmQueueField = typeof ( ConcurrentLruCore < K , V , I , P , T > ) . GetField ( "warmQueue" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
1271+ private static FieldInfo coldQueueField = typeof ( ConcurrentLruCore < K , V , I , P , T > ) . GetField ( "coldQueue" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
1272+
1273+ public ConcurrentLruIntegrityChecker ( ConcurrentLruCore < K , V , I , P , T > cache )
1274+ {
1275+ this . cache = cache ;
1276+
1277+ // get queues via reflection
1278+ this . hotQueue = ( ConcurrentQueue < I > ) hotQueueField . GetValue ( cache ) ;
1279+ this . warmQueue = ( ConcurrentQueue < I > ) warmQueueField . GetValue ( cache ) ;
1280+ this . coldQueue = ( ConcurrentQueue < I > ) coldQueueField . GetValue ( cache ) ;
1281+ }
1282+
1283+ public void Validate ( )
1284+ {
1285+ // queue counters must be consistent with queues
1286+ this . hotQueue . Count . Should ( ) . Be ( cache . HotCount , "hot queue has a corrupted count" ) ;
1287+ this . warmQueue . Count . Should ( ) . Be ( cache . WarmCount , "warm queue has a corrupted count" ) ;
1288+ this . coldQueue . Count . Should ( ) . Be ( cache . ColdCount , "cold queue has a corrupted count" ) ;
1289+
1290+ // cache contents must be consistent with queued items
1291+ ValidateQueue ( cache , this . hotQueue , "hot" ) ;
1292+ ValidateQueue ( cache , this . warmQueue , "warm" ) ;
1293+ ValidateQueue ( cache , this . coldQueue , "cold" ) ;
1294+
1295+ // cache must be within capacity
1296+ cache . Count . Should ( ) . BeLessThanOrEqualTo ( cache . Capacity + 1 , "capacity out of valid range" ) ;
1297+ }
1298+
1299+ private void ValidateQueue ( ConcurrentLruCore < K , V , I , P , T > cache , ConcurrentQueue < I > queue , string queueName )
1300+ {
1301+ foreach ( var item in queue )
1302+ {
1303+ if ( item . WasRemoved )
1304+ {
1305+ // It is possible for the queues to contain 2 (or more) instances of the same key/item. One that was removed,
1306+ // and one that was added after the other was removed.
1307+ // In this case, the dictionary may contain the value only if the queues contain an entry for that key marked as WasRemoved == false.
1308+ if ( cache . TryGet ( item . Key , out var value ) )
1309+ {
1310+ hotQueue . Union ( warmQueue ) . Union ( coldQueue )
1311+ . Any ( i => i . Key . Equals ( item . Key ) && ! i . WasRemoved )
1312+ . Should ( ) . BeTrue ( $ "{ queueName } removed item { item . Key } was not removed") ;
1313+ }
1314+ }
1315+ else
1316+ {
1317+ cache . TryGet ( item . Key , out var value ) . Should ( ) . BeTrue ( $ "{ queueName } item { item . Key } was not present") ;
1318+ }
1319+ }
1320+ }
11181321 }
11191322}
0 commit comments