Skip to content

Commit 9a5d1d3

Browse files
rosca-sabinaSabina Rosca
authored andcommitted
Added topology recovery filter
1 parent c351467 commit 9a5d1d3

12 files changed

+204
-42
lines changed

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,12 @@ public TimeSpan ContinuationTimeout
271271
/// </summary>
272272
public bool TopologyRecoveryEnabled { get; set; } = true;
273273

274+
/// <summary>
275+
/// Filter to include/exclude entities from topology recovery.
276+
/// Default filter includes all entities in topology recovery.
277+
/// </summary>
278+
public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter();
279+
274280
/// <summary>
275281
/// Construct a fresh instance, with all fields set to their respective defaults.
276282
/// </summary>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
public interface IRecordedBinding
6+
{
7+
string Source { get; }
8+
9+
string Destination { get; }
10+
11+
string RoutingKey { get; }
12+
13+
IDictionary<string, object> Arguments { get; }
14+
}
15+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
public interface IRecordedConsumer
6+
{
7+
string ConsumerTag { get; }
8+
9+
string Queue { get; }
10+
11+
bool AutoAck { get; }
12+
13+
bool Exclusive { get; }
14+
15+
IDictionary<string, object> Arguments { get; }
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
public interface IRecordedExchange
6+
{
7+
string Name { get; }
8+
9+
string Type { get; }
10+
11+
bool Durable { get; }
12+
13+
bool AutoDelete { get; }
14+
15+
IDictionary<string, object> Arguments { get; }
16+
}
17+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
public interface IRecordedQueue
6+
{
7+
string Name { get; }
8+
9+
bool Durable { get; }
10+
11+
bool Exclusive { get; }
12+
13+
bool AutoDelete { get; }
14+
15+
IDictionary<string, object> Arguments { get; }
16+
17+
bool IsServerNamed { get; }
18+
}
19+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
/// <summary>
6+
/// Filter to know which entities (exchanges, queues, bindings, consumers) should be recovered by topology recovery.
7+
/// By default, allows all entities to be recovered.
8+
/// </summary>
9+
public class TopologyRecoveryFilter
10+
{
11+
private static readonly Func<IRecordedExchange, bool> s_defaultExchangeFilter = exchange => true;
12+
private static readonly Func<IRecordedQueue, bool> s_defaultQueueFilter = queue => true;
13+
private static readonly Func<IRecordedBinding, bool> s_defaultBindingFilter = binding => true;
14+
private static readonly Func<IRecordedConsumer, bool> s_defaultConsumerFilter = consumer => true;
15+
16+
private Func<IRecordedExchange, bool> _exchangeFilter;
17+
private Func<IRecordedQueue, bool> _queueFilter;
18+
private Func<IRecordedBinding, bool> _bindingFilter;
19+
private Func<IRecordedConsumer, bool> _consumerFilter;
20+
21+
/// <summary>
22+
/// Decides whether an exchange is recovered or not.
23+
/// </summary>
24+
public Func<IRecordedExchange, bool> ExchangeFilter
25+
{
26+
get => _exchangeFilter ?? s_defaultExchangeFilter;
27+
28+
set
29+
{
30+
if (_exchangeFilter != null)
31+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeFilter)} after it has been initialized.");
32+
33+
_exchangeFilter = value ?? throw new ArgumentNullException(nameof(ExchangeFilter));
34+
}
35+
}
36+
37+
/// <summary>
38+
/// Decides whether a queue is recovered or not.
39+
/// </summary>
40+
public Func<IRecordedQueue, bool> QueueFilter
41+
{
42+
get => _queueFilter ?? s_defaultQueueFilter;
43+
44+
set
45+
{
46+
if (_queueFilter != null)
47+
throw new InvalidOperationException($"Cannot modify {nameof(QueueFilter)} after it has been initialized.");
48+
49+
_queueFilter = value ?? throw new ArgumentNullException(nameof(QueueFilter));
50+
}
51+
}
52+
53+
/// <summary>
54+
/// Decides whether a binding is recovered or not.
55+
/// </summary>
56+
public Func<IRecordedBinding, bool> BindingFilter
57+
{
58+
get => _bindingFilter ?? s_defaultBindingFilter;
59+
60+
set
61+
{
62+
if (_bindingFilter != null)
63+
throw new InvalidOperationException($"Cannot modify {nameof(BindingFilter)} after it has been initialized.");
64+
65+
_bindingFilter = value ?? throw new ArgumentNullException(nameof(BindingFilter));
66+
}
67+
}
68+
69+
/// <summary>
70+
/// Decides whether a consumer is recovered or not.
71+
/// </summary>
72+
public Func<IRecordedConsumer, bool> ConsumerFilter
73+
{
74+
get => _consumerFilter ?? s_defaultConsumerFilter;
75+
76+
set
77+
{
78+
if (_consumerFilter != null)
79+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerFilter)} after it has been initialized.");
80+
81+
_consumerFilter = value ?? throw new ArgumentNullException(nameof(ConsumerFilter));
82+
}
83+
}
84+
}
85+
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ public void MaybeDeleteRecordedAutoDeleteExchange(string exchange)
576576
// last binding where this exchange is the source is gone,
577577
// remove recorded exchange
578578
// if it is auto-deleted. See bug 26364.
579-
if ((rx != null) && rx.IsAutoDelete)
579+
if ((rx != null) && rx.AutoDelete)
580580
{
581581
DeleteRecordedExchange(exchange);
582582
}
@@ -593,7 +593,7 @@ public void MaybeDeleteRecordedAutoDeleteQueue(string queue)
593593
_recordedQueues.TryGetValue(queue, out RecordedQueue rq);
594594
// last consumer on this connection is gone, remove recorded queue
595595
// if it is auto-deleted. See bug 26364.
596-
if ((rq != null) && rq.IsAutoDelete)
596+
if ((rq != null) && rq.AutoDelete)
597597
{
598598
DeleteRecordedQueue(queue);
599599
}
@@ -993,7 +993,7 @@ private void RecoverBindings(IModel model)
993993
recordedBindingsCopy = new Dictionary<RecordedBinding, byte>(_recordedBindings);
994994
}
995995

996-
foreach (RecordedBinding b in recordedBindingsCopy.Keys)
996+
foreach (RecordedBinding b in recordedBindingsCopy.Keys.Where(x => _factory.TopologyRecoveryFilter?.BindingFilter(x) ?? true))
997997
{
998998
try
999999
{
@@ -1089,7 +1089,7 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe
10891089
recordedConsumersCopy = new Dictionary<string, RecordedConsumer>(_recordedConsumers);
10901090
}
10911091

1092-
foreach (KeyValuePair<string, RecordedConsumer> pair in recordedConsumersCopy)
1092+
foreach (KeyValuePair<string, RecordedConsumer> pair in recordedConsumersCopy.Where(x => _factory.TopologyRecoveryFilter?.ConsumerFilter(x.Value) ?? true))
10931093
{
10941094
RecordedConsumer cons = pair.Value;
10951095
if (cons.Model != modelToRecover)
@@ -1154,7 +1154,7 @@ private void RecoverExchanges(IModel model)
11541154
recordedExchangesCopy = new Dictionary<string, RecordedExchange>(_recordedExchanges);
11551155
}
11561156

1157-
foreach (RecordedExchange rx in recordedExchangesCopy.Values)
1157+
foreach (RecordedExchange rx in recordedExchangesCopy.Values.Where(x => _factory.TopologyRecoveryFilter?.ExchangeFilter(x) ?? true))
11581158
{
11591159
try
11601160
{
@@ -1188,7 +1188,7 @@ private void RecoverQueues(IModel model)
11881188
recordedQueuesCopy = new Dictionary<string, RecordedQueue>(_recordedQueues);
11891189
}
11901190

1191-
foreach (KeyValuePair<string, RecordedQueue> pair in recordedQueuesCopy)
1191+
foreach (KeyValuePair<string, RecordedQueue> pair in recordedQueuesCopy.Where(x => _factory.TopologyRecoveryFilter?.QueueFilter(x.Value) ?? true))
11921192
{
11931193
string oldName = pair.Key;
11941194
RecordedQueue rq = pair.Value;

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1530,11 +1530,11 @@ public QueueDeclareOk QueueDeclare(string queue, bool durable,
15301530
QueueDeclareOk result = _delegate.QueueDeclare(queue, durable, exclusive,
15311531
autoDelete, arguments);
15321532
RecordedQueue rq = new RecordedQueue(result.QueueName).
1533-
Durable(durable).
1534-
Exclusive(exclusive).
1535-
AutoDelete(autoDelete).
1536-
Arguments(arguments).
1537-
ServerNamed(string.Empty.Equals(queue));
1533+
WithDurable(durable).
1534+
WithExclusive(exclusive).
1535+
WithAutoDelete(autoDelete).
1536+
WithArguments(arguments).
1537+
WithServerNamed(string.Empty.Equals(queue));
15381538
_connection.RecordQueue(result.QueueName, rq);
15391539
return result;
15401540
}
@@ -1551,11 +1551,11 @@ public void QueueDeclareNoWait(string queue, bool durable,
15511551
_delegate.QueueDeclareNoWait(queue, durable, exclusive,
15521552
autoDelete, arguments);
15531553
RecordedQueue rq = new RecordedQueue(queue).
1554-
Durable(durable).
1555-
Exclusive(exclusive).
1556-
AutoDelete(autoDelete).
1557-
Arguments(arguments).
1558-
ServerNamed(string.Empty.Equals(queue));
1554+
WithDurable(durable).
1555+
WithExclusive(exclusive).
1556+
WithAutoDelete(autoDelete).
1557+
WithArguments(arguments).
1558+
WithServerNamed(string.Empty.Equals(queue));
15591559
_connection.RecordQueue(queue, rq);
15601560
}
15611561

projects/RabbitMQ.Client/client/impl/RecordedBinding.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
namespace RabbitMQ.Client.Impl
3535
{
36-
internal abstract class RecordedBinding
36+
internal abstract class RecordedBinding : IRecordedBinding
3737
{
3838
public IDictionary<string, object> Arguments { get; protected set; }
3939
public string Destination { get; set; }

projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
namespace RabbitMQ.Client.Impl
3535
{
36-
internal class RecordedConsumer
36+
internal class RecordedConsumer : IRecordedConsumer
3737
{
3838
public RecordedConsumer(AutorecoveringModel model, string queue)
3939
{

0 commit comments

Comments
 (0)