Skip to content

Commit d42511e

Browse files
rosca-sabinaSabina Rosca
authored andcommitted
Added tests for topology recovery filter
1 parent 9a5d1d3 commit d42511e

File tree

2 files changed

+256
-0
lines changed

2 files changed

+256
-0
lines changed

projects/Unit/Fixtures.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,18 @@ internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyReco
151151
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
152152
}
153153

154+
internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryFilter(TopologyRecoveryFilter filter)
155+
{
156+
var cf = new ConnectionFactory
157+
{
158+
AutomaticRecoveryEnabled = true,
159+
TopologyRecoveryEnabled = true,
160+
TopologyRecoveryFilter = filter
161+
};
162+
163+
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
164+
}
165+
154166
internal IConnection CreateNonRecoveringConnection()
155167
{
156168
var cf = new ConnectionFactory

projects/Unit/TestConnectionRecovery.cs

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
using System;
3333
using System.Collections;
3434
using System.Collections.Generic;
35+
using System.Text;
3536
using System.Threading;
3637

3738
using NUnit.Framework;
@@ -803,6 +804,7 @@ public void TestServerNamedQueueRecovery()
803804

804805
Model.QueueDeclarePassive(nameAfter);
805806
}
807+
806808
[Test]
807809
public void TestUnbindQueueAfterRecoveryConnection()
808810
{
@@ -1099,6 +1101,248 @@ public void TestUnblockedListenersRecovery()
10991101
Wait(latch);
11001102
}
11011103

1104+
[Test]
1105+
public void TestTopologyRecoveryQueueFilter()
1106+
{
1107+
var filter = new TopologyRecoveryFilter
1108+
{
1109+
QueueFilter = queue => !queue.Name.Contains("filtered")
1110+
};
1111+
var latch = new ManualResetEventSlim(false);
1112+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1113+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1114+
IModel ch = conn.CreateModel();
1115+
1116+
var queueToRecover = "recovered.queue";
1117+
var queueToIgnore = "filtered.queue";
1118+
ch.QueueDeclare(queueToRecover, false, false, false, null);
1119+
ch.QueueDeclare(queueToIgnore, false, false, false, null);
1120+
1121+
Model.QueueDelete(queueToRecover);
1122+
Model.QueueDelete(queueToIgnore);
1123+
1124+
CloseAndWaitForRecovery(conn);
1125+
Wait(latch);
1126+
1127+
Assert.IsTrue(ch.IsOpen);
1128+
AssertQueueRecovery(ch, queueToRecover, false);
1129+
1130+
try
1131+
{
1132+
ch.QueueDeclarePassive(queueToIgnore);
1133+
Assert.Fail("Expected an exception");
1134+
}
1135+
catch (OperationInterruptedException e)
1136+
{
1137+
AssertShutdownError(e.ShutdownReason, 404);
1138+
}
1139+
}
1140+
1141+
[Test]
1142+
public void TestTopologyRecoveryExchangeFilter()
1143+
{
1144+
var filter = new TopologyRecoveryFilter
1145+
{
1146+
ExchangeFilter = exchange => exchange.Type == "topic" && !exchange.Name.Contains("filtered")
1147+
};
1148+
var latch = new ManualResetEventSlim(false);
1149+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1150+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1151+
IModel ch = conn.CreateModel();
1152+
1153+
var exchangeToRecover = "recovered.exchange";
1154+
var exchangeToIgnore = "filtered.exchange";
1155+
ch.ExchangeDeclare(exchangeToRecover, "topic", false, true);
1156+
ch.ExchangeDeclare(exchangeToIgnore, "direct", false, true);
1157+
1158+
Model.ExchangeDelete(exchangeToRecover);
1159+
Model.ExchangeDelete(exchangeToIgnore);
1160+
1161+
CloseAndWaitForRecovery(conn);
1162+
Wait(latch);
1163+
1164+
Assert.IsTrue(ch.IsOpen);
1165+
AssertExchangeRecovery(ch, exchangeToRecover);
1166+
1167+
try
1168+
{
1169+
ch.ExchangeDeclarePassive(exchangeToIgnore);
1170+
Assert.Fail("Expected an exception");
1171+
}
1172+
catch (OperationInterruptedException e)
1173+
{
1174+
AssertShutdownError(e.ShutdownReason, 404);
1175+
}
1176+
}
1177+
1178+
[Test]
1179+
public void TestTopologyRecoveryBindingFilter()
1180+
{
1181+
var filter = new TopologyRecoveryFilter
1182+
{
1183+
BindingFilter = binding => !binding.RoutingKey.Contains("filtered")
1184+
};
1185+
var latch = new ManualResetEventSlim(false);
1186+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1187+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1188+
IModel ch = conn.CreateModel();
1189+
1190+
var exchange = "topology.recovery.exchange";
1191+
var queueWithRecoveredBinding = "topology.recovery.queue.1";
1192+
var queueWithIgnoredBinding = "topology.recovery.queue.2";
1193+
var bindingToRecover = "recovered.binding";
1194+
var bindingToIgnore = "filtered.binding";
1195+
1196+
ch.ExchangeDeclare(exchange, "direct");
1197+
ch.QueueDeclare(queueWithRecoveredBinding, false, false, false, null);
1198+
ch.QueueDeclare(queueWithIgnoredBinding, false, false, false, null);
1199+
ch.QueueBind(queueWithRecoveredBinding, exchange, bindingToRecover);
1200+
ch.QueueBind(queueWithIgnoredBinding, exchange, bindingToIgnore);
1201+
ch.QueuePurge(queueWithRecoveredBinding);
1202+
ch.QueuePurge(queueWithIgnoredBinding);
1203+
1204+
Model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecover);
1205+
Model.QueueUnbind(queueWithIgnoredBinding, exchange, bindingToIgnore);
1206+
1207+
CloseAndWaitForRecovery(conn);
1208+
Wait(latch);
1209+
1210+
Assert.IsTrue(ch.IsOpen);
1211+
Assert.IsTrue(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecover));
1212+
Assert.IsFalse(SendAndConsumeMessage(queueWithIgnoredBinding, exchange, bindingToIgnore));
1213+
}
1214+
1215+
[Test]
1216+
public void TestTopologyRecoveryConsumerFilter()
1217+
{
1218+
var filter = new TopologyRecoveryFilter
1219+
{
1220+
ConsumerFilter = consumer => !consumer.ConsumerTag.Contains("filtered")
1221+
};
1222+
var latch = new ManualResetEventSlim(false);
1223+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1224+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1225+
IModel ch = conn.CreateModel();
1226+
ch.ConfirmSelect();
1227+
1228+
var exchange = "topology.recovery.exchange";
1229+
var queueWithRecoveredConsumer = "topology.recovery.queue.1";
1230+
var queueWithIgnoredConsumer = "topology.recovery.queue.2";
1231+
var binding1 = "recovered.binding";
1232+
var binding2 = "filtered.binding";
1233+
1234+
ch.ExchangeDeclare(exchange, "direct");
1235+
ch.QueueDeclare(queueWithRecoveredConsumer, false, false, false, null);
1236+
ch.QueueDeclare(queueWithIgnoredConsumer, false, false, false, null);
1237+
ch.QueueBind(queueWithRecoveredConsumer, exchange, binding1);
1238+
ch.QueueBind(queueWithIgnoredConsumer, exchange, binding2);
1239+
ch.QueuePurge(queueWithRecoveredConsumer);
1240+
ch.QueuePurge(queueWithIgnoredConsumer);
1241+
1242+
var recoverLatch = new ManualResetEventSlim(false);
1243+
var consumerToRecover = new EventingBasicConsumer(ch);
1244+
consumerToRecover.Received += (source, ea) => recoverLatch.Set();
1245+
ch.BasicConsume(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover);
1246+
1247+
var ignoredLatch = new ManualResetEventSlim(false);
1248+
var consumerToIgnore = new EventingBasicConsumer(ch);
1249+
consumerToIgnore.Received += (source, ea) => ignoredLatch.Set();
1250+
ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore);
1251+
1252+
CloseAndWaitForRecovery(conn);
1253+
Wait(latch);
1254+
1255+
Assert.IsTrue(ch.IsOpen);
1256+
ch.BasicPublish(exchange, binding1, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message"));
1257+
ch.BasicPublish(exchange, binding2, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message"));
1258+
1259+
Assert.IsTrue(recoverLatch.Wait(TimeSpan.FromSeconds(5)));
1260+
Assert.IsFalse(ignoredLatch.Wait(TimeSpan.FromSeconds(5)));
1261+
1262+
ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore);
1263+
1264+
try
1265+
{
1266+
ch.BasicConsume(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover);
1267+
Assert.Fail("Expected an exception");
1268+
}
1269+
catch (OperationInterruptedException e)
1270+
{
1271+
AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag
1272+
}
1273+
}
1274+
1275+
[Test]
1276+
public void TestTopologyRecoveryDefaultFilterRecoversAllEntities()
1277+
{
1278+
var filter = new TopologyRecoveryFilter();
1279+
var latch = new ManualResetEventSlim(false);
1280+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
1281+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1282+
IModel ch = conn.CreateModel();
1283+
ch.ConfirmSelect();
1284+
1285+
var exchange = "topology.recovery.exchange";
1286+
var queue1 = "topology.recovery.queue.1";
1287+
var queue2 = "topology.recovery.queue.2";
1288+
var binding1 = "recovered.binding";
1289+
var binding2 = "filtered.binding";
1290+
1291+
ch.ExchangeDeclare(exchange, "direct");
1292+
ch.QueueDeclare(queue1, false, false, false, null);
1293+
ch.QueueDeclare(queue2, false, false, false, null);
1294+
ch.QueueBind(queue1, exchange, binding1);
1295+
ch.QueueBind(queue2, exchange, binding2);
1296+
ch.QueuePurge(queue1);
1297+
ch.QueuePurge(queue2);
1298+
1299+
var consumerLatch1 = new ManualResetEventSlim(false);
1300+
var consumer1 = new EventingBasicConsumer(ch);
1301+
consumer1.Received += (source, ea) => consumerLatch1.Set();
1302+
ch.BasicConsume(queue1, true, "recovered.consumer", consumer1);
1303+
1304+
var consumerLatch2 = new ManualResetEventSlim(false);
1305+
var consumer2 = new EventingBasicConsumer(ch);
1306+
consumer2.Received += (source, ea) => consumerLatch2.Set();
1307+
ch.BasicConsume(queue2, true, "filtered.consumer", consumer2);
1308+
1309+
Model.ExchangeDelete(exchange);
1310+
Model.QueueDelete(queue1);
1311+
Model.QueueDelete(queue2);
1312+
1313+
CloseAndWaitForRecovery(conn);
1314+
Wait(latch);
1315+
1316+
Assert.IsTrue(ch.IsOpen);
1317+
AssertExchangeRecovery(ch, exchange);
1318+
ch.QueueDeclarePassive(queue1);
1319+
ch.QueueDeclarePassive(queue2);
1320+
1321+
ch.BasicPublish(exchange, binding1, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message"));
1322+
ch.BasicPublish(exchange, binding2, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message"));
1323+
1324+
Assert.IsTrue(consumerLatch1.Wait(TimeSpan.FromSeconds(5)));
1325+
Assert.IsTrue(consumerLatch2.Wait(TimeSpan.FromSeconds(5)));
1326+
}
1327+
1328+
internal bool SendAndConsumeMessage(string queue, string exchange, string routingKey)
1329+
{
1330+
using (var ch = Conn.CreateModel())
1331+
{
1332+
ch.ConfirmSelect();
1333+
var latch = new ManualResetEventSlim(false);
1334+
1335+
var consumer = new AckingBasicConsumer(ch, 1, latch);
1336+
1337+
ch.BasicConsume(queue, true, consumer);
1338+
1339+
ch.BasicPublish(exchange, routingKey, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message"));
1340+
ch.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
1341+
1342+
return latch.Wait(TimeSpan.FromSeconds(5));
1343+
}
1344+
}
1345+
11021346
internal void AssertExchangeRecovery(IModel m, string x)
11031347
{
11041348
m.ConfirmSelect();

0 commit comments

Comments
 (0)