Skip to content

Commit e81f196

Browse files
committed
Allow number of queue consumers to be configurable
1 parent 79ff913 commit e81f196

File tree

4 files changed

+16
-30
lines changed

4 files changed

+16
-30
lines changed

src/InEngine.Core.Tests/Queue/Commands/PublishTest.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using BeekmanLabs.UnitTesting;
45
using InEngine.Core.Commands;
@@ -62,11 +63,11 @@ public void ShouldFailWhenCommandDoesNotExist()
6263
{
6364
Subject.CommandAssembly = "foo";
6465
Subject.CommandClass = "bar";
65-
var expectedMessage = "Could not load plugin: " + Subject.CommandAssembly;
66+
var expectedMessage = "Plugin not found at ";
6667

6768
var result = Assert.Throws<PluginNotFoundException>(() => { Subject.Run(); });
6869

69-
Assert.AreEqual(expectedMessage, result.Message);
70+
Assert.IsTrue(result.Message.StartsWith(expectedMessage, StringComparison.InvariantCulture));
7071
}
7172
}
7273
}

src/InEngine.Core/Queue/Jobs.cs

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,36 +10,17 @@ public class Jobs : IJobs
1010
{
1111
public void Schedule(IScheduler scheduler)
1212
{
13-
ScheduleQueueConsumerJobs(scheduler);
14-
ScheduleQueueConsumerJobs(scheduler, true);
15-
//foreach (var index in Enumerable.Range(0, 8).ToList())
16-
//{
17-
// var consume = new Consume();
18-
// consume.Name = consume.Name + index;
19-
// var primaryQueueConsumer = consume.MakeJobBuilder().Build();
20-
// var secondaryQueueConsumer = consume.MakeJobBuilder().Build();
21-
// secondaryQueueConsumer.JobDataMap.Add("useSecondaryQueue", true);
22-
23-
// var primaryTrigger = consume
24-
// .MakeTriggerBuilder()
25-
// .StartNow()
26-
// .WithSimpleSchedule(x => x.WithIntervalInSeconds(1).RepeatForever())
27-
// .Build();
28-
29-
// var secondaryTrigger = consume
30-
// .MakeTriggerBuilder()
31-
// .StartNow()
32-
// .WithSimpleSchedule(x => x.WithIntervalInSeconds(1).RepeatForever())
33-
// .Build();
34-
35-
// scheduler.ScheduleJob(primaryQueueConsumer, primaryTrigger);
36-
// scheduler.ScheduleJob(secondaryQueueConsumer, secondaryTrigger);
37-
//}
13+
var queueSettings = InEngineSettings.Make().Queue;
14+
ScheduleQueueConsumerJobs(scheduler, queueSettings.PrimaryQueueConsumers);
15+
ScheduleQueueConsumerJobs(scheduler, queueSettings.SecondaryQueueConsumers, true);
3816
}
3917

40-
private void ScheduleQueueConsumerJobs(IScheduler scheduler, bool useSecondaryQueue = false)
18+
private void ScheduleQueueConsumerJobs(IScheduler scheduler, int consumers, bool useSecondaryQueue = false)
4119
{
42-
foreach (var index in Enumerable.Range(0, 8).ToList())
20+
if (consumers < 0) {
21+
throw new ArgumentOutOfRangeException(nameof(consumers), consumers, "The number of queue consumers must be 0 or greater.");
22+
}
23+
foreach (var index in Enumerable.Range(0, consumers).ToList())
4324
{
4425
var consume = new Consume() {
4526
ScheduleId = $"{(useSecondaryQueue ? "secondary" : "primary")}:{index.ToString()}"

src/InEngine.Core/Queue/QueueSettings.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ namespace InEngine.Core.Queue
33
{
44
public class QueueSettings
55
{
6+
public int PrimaryQueueConsumers { get; set; } = 8;
7+
public int SecondaryQueueConsumers { get; set; } = 8;
68
public string QueueName { get; set; }
79
public string RedisHost { get; set; }
810
public int RedisPort { get; set; }

src/InEngine/appsettings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
{
22
"InEngine": {
33
"Queue": {
4+
"PrimaryQueueConsumers": 16,
5+
"SecondaryQueueConsumers": -1,
46
"QueueName": "InEngine:Queue",
57
"RedisHost": "localhost",
68
"RedisPort": 6379,

0 commit comments

Comments
 (0)