Skip to content

Commit 33b679b

Browse files
committed
Add support for failed/dead letter queue
1 parent 6986b16 commit 33b679b

File tree

1 file changed

+41
-21
lines changed

1 file changed

+41
-21
lines changed

src/InEngine.Core/Queuing/Clients/RabbitMQClient.cs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ public class RabbitMQClient : IQueueClient, IDisposable
2121
public string PendingQueueName { get { return QueueBaseName + $":{QueueName}:Pending"; } }
2222
public string FailedQueueName { get { return QueueBaseName + $":{QueueName}:Failed"; } }
2323
public bool UseCompression { get; set; }
24+
public string DeadLetterExchangeName { get { return QueueBaseName + $":DeadLetter"; } }
25+
public string ExchangeName { get { return QueueBaseName; } }
26+
public string RoutingKey { get { return QueueName; } }
2427
IConnection _connection;
2528
public IConnection Connection { get {
2629
if (_connection == null) {
@@ -34,7 +37,6 @@ public IConnection Connection { get {
3437
factory.UserName = ClientSettings.Username;
3538
factory.Password = ClientSettings.Password;
3639
}
37-
3840
_connection = factory.CreateConnection();
3941
}
4042
return _connection;
@@ -49,9 +51,21 @@ public IModel Channel {
4951
}
5052
}
5153

52-
public void Publish(AbstractCommand command)
54+
public void InitChannel()
5355
{
56+
Channel.ExchangeDeclare(QueueBaseName, ExchangeType.Direct);
57+
Channel.ExchangeDeclare(DeadLetterExchangeName, ExchangeType.Direct);
58+
Channel.QueueDeclare(PendingQueueName, true, false, false, new Dictionary<string, object> {
59+
{ "x-dead-letter-exchange", DeadLetterExchangeName }
60+
});
61+
Channel.QueueBind(PendingQueueName, ExchangeName, RoutingKey, null);
62+
Channel.QueueDeclare(FailedQueueName, true, false, false);
63+
Channel.QueueBind(FailedQueueName, DeadLetterExchangeName, RoutingKey, null);
64+
}
5465

66+
public void Publish(AbstractCommand command)
67+
{
68+
InitChannel();
5569
var body = Encoding.UTF8.GetBytes(new CommandEnvelope()
5670
{
5771
IsCompressed = UseCompression,
@@ -60,24 +74,21 @@ public void Publish(AbstractCommand command)
6074
SerializedCommand = command.SerializeToJson(UseCompression)
6175
}.SerializeToJson());
6276

63-
Channel.ExchangeDeclare(QueueBaseName, ExchangeType.Direct);
64-
Channel.QueueDeclare(PendingQueueName, true, false, false);
6577
var properties = Channel.CreateBasicProperties();
6678
properties.Persistent = true;
67-
Channel.QueueBind(PendingQueueName, QueueBaseName, QueueName, null);
68-
Channel.BasicPublish(exchange: QueueBaseName,
69-
routingKey: QueueName,
70-
basicProperties: properties,
71-
mandatory: true,
72-
body: body);
79+
Channel.BasicPublish(exchange: ExchangeName,
80+
routingKey: RoutingKey,
81+
basicProperties: properties,
82+
mandatory: true,
83+
body: body);
7384
}
7485

7586
public void Recover()
7687
{ }
7788

7889
public void Consume(CancellationToken cancellationToken)
7990
{
80-
Channel.QueueDeclare(PendingQueueName, true, false, false);
91+
InitChannel();
8192
var consumer = new EventingBasicConsumer(Channel);
8293
consumer.Received += (model, result) => {
8394
var eventingConsumer = (EventingBasicConsumer)model;
@@ -114,6 +125,7 @@ public void Consume(CancellationToken cancellationToken)
114125

115126
public ICommandEnvelope Consume()
116127
{
128+
InitChannel();
117129
var result = Channel.BasicGet(PendingQueueName, false);
118130
if (result == null)
119131
return null;
@@ -146,32 +158,37 @@ public ICommandEnvelope Consume()
146158
return commandEnvelope;
147159
}
148160

149-
public bool ClearFailedQueue()
161+
public bool ClearPendingQueue()
150162
{
151-
throw new NotImplementedException();
163+
InitChannel();
164+
return Channel.QueuePurge(PendingQueueName) > 0;
152165
}
153166

154-
public bool ClearInProgressQueue()
167+
public bool ClearFailedQueue()
155168
{
156-
throw new NotImplementedException();
169+
InitChannel();
170+
return Channel.QueuePurge(FailedQueueName) > 0;
157171
}
158172

159-
public bool ClearPendingQueue()
173+
public long GetPendingQueueLength()
160174
{
161-
throw new NotImplementedException();
175+
InitChannel();
176+
return Channel.MessageCount(PendingQueueName);
162177
}
163178

164179
public long GetFailedQueueLength()
165180
{
166-
throw new NotImplementedException();
181+
InitChannel();
182+
return Channel.MessageCount(FailedQueueName);
167183
}
168184

169-
public long GetInProgressQueueLength()
185+
#region Not implemented
186+
public bool ClearInProgressQueue()
170187
{
171188
throw new NotImplementedException();
172189
}
173190

174-
public long GetPendingQueueLength()
191+
public long GetInProgressQueueLength()
175192
{
176193
throw new NotImplementedException();
177194
}
@@ -193,12 +210,15 @@ public List<ICommandEnvelope> PeekPendingMessages(long from, long to)
193210

194211
public void RepublishFailedMessages()
195212
{
213+
196214
throw new NotImplementedException();
197215
}
216+
#endregion
198217

199218
public void Dispose()
200219
{
201-
Connection.Close();
220+
if (Connection != null && Connection.IsOpen)
221+
Connection.Close();
202222
}
203223
}
204224
}

0 commit comments

Comments
 (0)