Skip to content

Commit fb748c6

Browse files
committed
Use async/await in JobProcessor instead of Thread
1 parent c8e341c commit fb748c6

14 files changed

+176
-122
lines changed

IntegrationEngine.Tests/EngineHostCompositionRootTest.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ public void ShouldSetupMessageQueueListener()
5656
Subject.Container.RegisterType<IRabbitMQConfiguration, RabbitMQConfiguration>(configName,
5757
new InjectionConstructor(new ResolvedParameter<IEngineConfiguration>(), configName));
5858

59-
Subject.SetupThreadedListenerManager();
59+
Subject.SetupMessageQueueListenerManager();
6060

61-
Subject.Container.Resolve<IThreadedListenerManager>();
61+
Assert.That(Subject.MessageQueueListenerManager, Is.Not.Null);
6262
}
6363

6464
[Test]

IntegrationEngine.Tests/IntegrationEngine.Tests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@
135135
<Compile Include="Api\TriggerControllerBaseTest.cs" />
136136
<Compile Include="Api\TriggerStub.cs" />
137137
<Compile Include="EngineHostCompositionRootTest.cs" />
138-
<Compile Include="JobProcessor\ThreadedListenerManagerTest.cs" />
139138
<Compile Include="Api\WebApiApplicationTest.cs" />
139+
<Compile Include="JobProcessor\MessageQueueListenerManagerTest.cs" />
140140
</ItemGroup>
141141
<ItemGroup>
142142
<None Include="..\configuration\IntegrationEngine.json">
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using BeekmanLabs.UnitTesting;
2+
using IntegrationEngine.JobProcessor;
3+
using NUnit.Framework;
4+
using Moq;
5+
using System;
6+
using System.Threading;
7+
8+
namespace IntegrationEngine.Tests
9+
{
10+
public class MessageQueueListenerManagerTest : TestBase<MessageQueueListenerManager>
11+
{
12+
public Mock<MessageQueueListenerFactory> MockMessageQueueListenerFactory { get; set; }
13+
14+
[SetUp]
15+
public void Setup()
16+
{
17+
MockMessageQueueListenerFactory = new Mock<MessageQueueListenerFactory>();
18+
MockMessageQueueListenerFactory.Setup(x => x.CreateRabbitMQListener())
19+
.Returns<IMessageQueueListener>(null);
20+
Subject.MessageQueueListenerFactory = MockMessageQueueListenerFactory.Object;
21+
}
22+
23+
[Test]
24+
public void ShouldStartListener()
25+
{
26+
Subject.ListenerTaskCount = 1;
27+
28+
Subject.StartListener();
29+
30+
MockMessageQueueListenerFactory.Verify(x => x.CreateRabbitMQListener(), Times.Once);
31+
}
32+
33+
[Test]
34+
public void ShouldStartMultipleListeners()
35+
{
36+
var listenerTaskCount = 4;
37+
Subject.ListenerTaskCount = listenerTaskCount;
38+
39+
Subject.StartListener();
40+
41+
MockMessageQueueListenerFactory.Verify(x => x.CreateRabbitMQListener(),
42+
Times.Exactly(listenerTaskCount));
43+
}
44+
45+
[Test]
46+
public void ShouldSetCancellationTokenOnDispose()
47+
{
48+
Subject.CancellationTokenSource = new CancellationTokenSource();
49+
50+
Subject.Dispose();
51+
52+
Assert.That(Subject.CancellationTokenSource.IsCancellationRequested, Is.True);
53+
}
54+
}
55+
}
56+

IntegrationEngine.Tests/JobProcessor/ThreadedListenerManagerTest.cs

Lines changed: 0 additions & 33 deletions
This file was deleted.

IntegrationEngine/EngineHost.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void Initialize(bool isThreadedListenerEnabled = true,
3030
engineHostCompositionRoot = new EngineHostCompositionRoot(AssembliesWithJobs) {
3131
IsWebApiEnabled = isWebApiEnabled,
3232
IsSchedulerEnabled = isSchedulerEnabled,
33-
IsThreadedListenerEnabled = isThreadedListenerEnabled,
33+
IsMessageQueueListenerManagerEnabled = isThreadedListenerEnabled,
3434
};
3535
engineHostCompositionRoot.Configure();
3636
}

IntegrationEngine/EngineHostCompositionRoot.cs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ public class EngineHostCompositionRoot : IDisposable
3333
public IList<Type> IntegrationJobTypes { get; set; }
3434
public ILog Log { get; set; }
3535
public IWebApiApplication WebApiApplication { get; set; }
36+
public IMessageQueueListenerManager MessageQueueListenerManager { get; set; }
3637
public bool IsWebApiEnabled { get; set; }
3738
public bool IsSchedulerEnabled { get; set; }
38-
public bool IsThreadedListenerEnabled { get; set; }
39+
public bool IsMessageQueueListenerManagerEnabled { get; set; }
3940

4041
public EngineHostCompositionRoot()
4142
{}
@@ -63,8 +64,8 @@ public void Configure()
6364
RegisterIntegrationJobs();
6465
SetupRScriptRunner();
6566
SetupElasticsearchRepository();
66-
if (IsThreadedListenerEnabled)
67-
SetupThreadedListenerManager();
67+
if (IsMessageQueueListenerManagerEnabled)
68+
SetupMessageQueueListenerManager();
6869
if (IsSchedulerEnabled)
6970
SetupEngineScheduler();
7071
if (IsWebApiEnabled)
@@ -173,21 +174,14 @@ public void RegisterIntegrationJobs()
173174
});
174175
}
175176

176-
public void SetupThreadedListenerManager()
177+
public async void SetupMessageQueueListenerManager()
177178
{
178179
var config = Container.Resolve<IRabbitMQConfiguration>("DefaultRabbitMQ");
179-
var rabbitMqListener = new RabbitMQListener() {
180-
IntegrationJobTypes = IntegrationJobTypes,
181-
MessageQueueConnection = new MessageQueueConnection(config),
182-
RabbitMQConfiguration = config,
183-
UnityContainer = Container,
184-
};
185-
186-
var threadedListenerManager = new ThreadedListenerManager() {
187-
MessageQueueListener = rabbitMqListener,
180+
var messageQueueListenerFactory = new MessageQueueListenerFactory(Container, IntegrationJobTypes, config);
181+
MessageQueueListenerManager = new MessageQueueListenerManager() {
182+
MessageQueueListenerFactory = messageQueueListenerFactory,
188183
};
189-
Container.RegisterInstance<IThreadedListenerManager>(threadedListenerManager);
190-
threadedListenerManager.StartListener();
184+
await MessageQueueListenerManager.StartListener();
191185
}
192186

193187
public void SetupEngineScheduler()
@@ -241,6 +235,8 @@ public void Dispose()
241235
{
242236
if (WebApiApplication != null)
243237
WebApiApplication.Dispose();
238+
if (MessageQueueListenerManager != null)
239+
MessageQueueListenerManager.Dispose();
244240
}
245241
}
246242
}

IntegrationEngine/IntegrationEngine.csproj

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,13 @@
176176
<Compile Include="Api\TriggerControllerBase.cs" />
177177
<Compile Include="Api\Controllers\LogEventController.cs" />
178178
<Compile Include="EngineHostCompositionRoot.cs" />
179-
<Compile Include="JobProcessor\IThreadedListenerManager.cs" />
180179
<Compile Include="JobProcessor\IMessageQueueListener.cs" />
181180
<Compile Include="JobProcessor\MsmqListener.cs" />
182181
<Compile Include="JobProcessor\RabbitMQListener.cs" />
183182
<Compile Include="EngineConfiguration.cs" />
184-
<Compile Include="JobProcessor\ThreadedListenerManager.cs" />
183+
<Compile Include="JobProcessor\IMessageQueueListenerManager.cs" />
184+
<Compile Include="JobProcessor\MessageQueueListenerManager.cs" />
185+
<Compile Include="JobProcessor\MessageQueueListenerFactory.cs" />
185186
</ItemGroup>
186187
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
187188
<ItemGroup>

IntegrationEngine/JobProcessor/IMessageQueueListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ namespace IntegrationEngine.JobProcessor
55
{
66
public interface IMessageQueueListener : IDisposable
77
{
8-
void Listen(CancellationToken cancellationToken);
8+
void Listen(CancellationToken cancellationToken, int listenerId);
99
}
1010
}

IntegrationEngine/JobProcessor/IThreadedListenerManager.cs renamed to IntegrationEngine/JobProcessor/IMessageQueueListenerManager.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
using System.Reflection;
1515
using System.Text;
1616
using System.Threading;
17+
using System.Threading.Tasks;
1718

1819
namespace IntegrationEngine.JobProcessor
1920
{
20-
public interface IThreadedListenerManager : IDisposable
21+
public interface IMessageQueueListenerManager : IDisposable
2122
{
2223
CancellationTokenSource CancellationTokenSource { get; set; }
23-
IMessageQueueListener MessageQueueListener { get; set; }
24+
MessageQueueListenerFactory MessageQueueListenerFactory { get; set; }
2425
ILog Log { get; set; }
25-
void StartListener();
26+
Task StartListener();
2627
}
2728
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using IntegrationEngine.Core.Configuration;
2+
using IntegrationEngine.Core.MessageQueue;
3+
using Microsoft.Practices.Unity;
4+
using System;
5+
using System.Collections.Generic;
6+
7+
namespace IntegrationEngine.JobProcessor
8+
{
9+
public class MessageQueueListenerFactory
10+
{
11+
public IUnityContainer UnityContainer { get; set; }
12+
public IList<Type> IntegrationJobTypes { get; set; }
13+
public IRabbitMQConfiguration RabbitMQConfiguration { get; set; }
14+
15+
public MessageQueueListenerFactory()
16+
{
17+
}
18+
19+
public MessageQueueListenerFactory(IUnityContainer unityContainer, IList<Type> integrationJobTypes, IRabbitMQConfiguration rabbitMQConfiguration)
20+
: this()
21+
{
22+
UnityContainer = unityContainer;
23+
IntegrationJobTypes = integrationJobTypes;
24+
RabbitMQConfiguration = rabbitMQConfiguration;
25+
}
26+
27+
public virtual IMessageQueueListener CreateRabbitMQListener()
28+
{
29+
return new RabbitMQListener() {
30+
IntegrationJobTypes = IntegrationJobTypes,
31+
MessageQueueConnection = new MessageQueueConnection(RabbitMQConfiguration),
32+
RabbitMQConfiguration = RabbitMQConfiguration,
33+
UnityContainer = UnityContainer,
34+
};
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)