Skip to content

Commit 9f332b4

Browse files
author
Ethan Hann
committed
Run MQ listener in a thread
1 parent 473e4f6 commit 9f332b4

File tree

1 file changed

+31
-4
lines changed

1 file changed

+31
-4
lines changed

IntegrationEngine/MessageQueue/RabbitMQListener.cs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,34 @@
1111
using System.Collections.Generic;
1212
using System.Linq;
1313
using System.Text;
14+
using System.Threading;
1415

1516
namespace IntegrationEngine.MessageQueue
1617
{
17-
public class RabbitMQListener : IMessageQueueListener
18+
public class RabbitMQListener : IMessageQueueListener, IDisposable
1819
{
20+
Thread listenerThread;
21+
bool shouldTerminate;
1922
public IList<Type> IntegrationJobTypes { get; set; }
2023
public MessageQueueConfiguration MessageQueueConfiguration { get; set; }
2124
public MessageQueueConnection MessageQueueConnection { get; set; }
2225
public ILog Log { get; set; }
2326
public IMailClient MailClient { get; set; }
2427
public IntegrationEngineContext IntegrationEngineContext { get; set; }
2528
public IElasticClient ElasticClient { get; set; }
26-
29+
2730
public RabbitMQListener()
28-
{}
31+
{
32+
shouldTerminate = false;
33+
}
34+
35+
void Dispose()
36+
{
37+
shouldTerminate = true;
38+
listenerThread.Join();
39+
}
2940

30-
public void Listen()
41+
void _listen()
3142
{
3243
var connection = MessageQueueConnection.GetConnection();
3344
using (var channel = connection.CreateModel())
@@ -38,6 +49,8 @@ public void Listen()
3849
Log.Info(x => x("Waiting for messages..."));
3950
while (true)
4051
{
52+
if (shouldTerminate)
53+
return;
4154
var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
4255
var body = eventArgs.Body;
4356
var message = JsonConvert.DeserializeObject<DispatchMessage>(Encoding.UTF8.GetString(body));
@@ -64,6 +77,20 @@ public void Listen()
6477
}
6578
}
6679

80+
public void Listen()
81+
{
82+
if (listenerThread == null)
83+
listenerThread = new Thread(_listen);
84+
if (listenerThread.ThreadState == ThreadState.Running)
85+
{
86+
Log.Info("Message queue listener already running.");
87+
return;
88+
}
89+
90+
listenerThread.Start();
91+
Log.Info("Message queue listener started.");
92+
}
93+
6794
T AutoWireJob<T>(T job, Type type)
6895
{
6996
if (type.GetInterface(typeof(IMailJob).Name) != null)

0 commit comments

Comments
 (0)