99using RabbitMQ . Client . Events ;
1010using System ;
1111using System . Collections . Generic ;
12+ using System . IO ;
1213using System . Linq ;
1314using System . Text ;
1415using System . Threading ;
1516
1617namespace IntegrationEngine . MessageQueue
1718{
18- public class RabbitMQListener : IMessageQueueListener , IDisposable
19+ public class RabbitMQListener : IMessageQueueListener
1920 {
2021 Thread listenerThread ;
21- bool shouldTerminate ;
22+ volatile bool shouldTerminate ;
23+ QueueingBasicConsumer consumer ;
2224 public IList < Type > IntegrationJobTypes { get ; set ; }
2325 public MessageQueueConfiguration MessageQueueConfiguration { get ; set ; }
2426 public MessageQueueConnection MessageQueueConnection { get ; set ; }
2527 public ILog Log { get ; set ; }
2628 public IMailClient MailClient { get ; set ; }
2729 public IntegrationEngineContext IntegrationEngineContext { get ; set ; }
2830 public IElasticClient ElasticClient { get ; set ; }
29-
31+
3032 public RabbitMQListener ( )
3133 {
3234 shouldTerminate = false ;
@@ -35,6 +37,8 @@ public RabbitMQListener()
3537 public void Dispose ( )
3638 {
3739 shouldTerminate = true ;
40+ if ( consumer != null )
41+ consumer . Queue . Close ( ) ;
3842 listenerThread . Join ( ) ;
3943 }
4044
@@ -43,35 +47,44 @@ void _listen()
4347 var connection = MessageQueueConnection . GetConnection ( ) ;
4448 using ( var channel = connection . CreateModel ( ) )
4549 {
46- var consumer = new QueueingBasicConsumer ( channel ) ;
50+ consumer = new QueueingBasicConsumer ( channel ) ;
4751 channel . BasicConsume ( MessageQueueConfiguration . QueueName , true , consumer ) ;
48-
4952 Log . Info ( x => x ( "Waiting for messages..." ) ) ;
53+
5054 while ( true )
5155 {
52- if ( shouldTerminate )
53- return ;
54- var eventArgs = ( BasicDeliverEventArgs ) consumer . Queue . Dequeue ( ) ;
55- var body = eventArgs . Body ;
56- var message = JsonConvert . DeserializeObject < DispatchMessage > ( Encoding . UTF8 . GetString ( body ) ) ;
57- Log . Debug ( x => x ( "Message queue listener received {0}" , message ) ) ;
58- if ( IntegrationJobTypes != null && ! IntegrationJobTypes . Any ( ) )
59- continue ;
60- var type = IntegrationJobTypes . FirstOrDefault ( t => t . FullName . Equals ( message . JobTypeName ) ) ;
61- var integrationJob = Activator . CreateInstance ( type ) as IIntegrationJob ;
62- integrationJob = AutoWireJob ( integrationJob , type ) ;
56+ var message = new DispatchMessage ( ) ;
6357 try
6458 {
59+ if ( shouldTerminate )
60+ {
61+ connection . Close ( ) ;
62+ Log . Info ( "Message queue listener has stopped listening for messages." ) ;
63+ return ;
64+ }
65+ var eventArgs = ( BasicDeliverEventArgs ) consumer . Queue . Dequeue ( ) ;
66+ var body = eventArgs . Body ;
67+ message = JsonConvert . DeserializeObject < DispatchMessage > ( Encoding . UTF8 . GetString ( body ) ) ;
68+ Log . Debug ( x => x ( "Message queue listener received {0}" , message ) ) ;
69+ if ( IntegrationJobTypes != null && ! IntegrationJobTypes . Any ( ) )
70+ continue ;
71+ var type = IntegrationJobTypes . FirstOrDefault ( t => t . FullName . Equals ( message . JobTypeName ) ) ;
72+ var integrationJob = Activator . CreateInstance ( type ) as IIntegrationJob ;
73+ integrationJob = AutoWireJob ( integrationJob , type ) ;
6574 if ( integrationJob != null )
6675 {
6776 if ( integrationJob . GetType ( ) is IParameterizedJob )
6877 ( integrationJob as IParameterizedJob ) . Parameters = message . Parameters ;
6978 integrationJob . Run ( ) ;
7079 }
7180 }
81+ catch ( EndOfStreamException exception )
82+ {
83+ Log . Debug ( x => x ( "The message queue ({0}) has closed." , MessageQueueConfiguration . QueueName ) , exception ) ;
84+ }
7285 catch ( Exception exception )
7386 {
74- Log . Error ( x => x ( "Integration job did not run successfully ({0})}" , message ) , exception ) ;
87+ Log . Error ( x => x ( "Integration job did not run successfully ({0})}" , message . JobTypeName ) , exception ) ;
7588 }
7689 }
7790 }
0 commit comments