Skip to content

Commit cdb09c2

Browse files
committed
Apply settings to threaded dequeuer
1 parent d95d9fb commit cdb09c2

File tree

3 files changed

+16
-14
lines changed

3 files changed

+16
-14
lines changed

src/InEngine.Core/Queuing/Dequeue.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66

77
namespace InEngine.Core.Queuing
88
{
9-
public class Dequeue
9+
public class Dequeue : IDisposable
1010
{
1111
IList<QueueAdapter> queueAdapters;
1212
public CancellationTokenSource CancellationTokenSource { get; set; }
13-
public int TaskCount { get; set; }
13+
public QueueSettings QueueSettings { get; set; }
1414

1515
public Dequeue()
1616
{
@@ -20,25 +20,28 @@ public Dequeue()
2020

2121
public async Task StartAsync()
2222
{
23+
if (QueueSettings == null)
24+
QueueSettings = InEngineSettings.Make().Queue;
25+
2326
// Create dequeue tasks for primary and secondary queues.
2427
var allTasks = new List<Task>();
2528
Console.WriteLine("Start dequeue tasks for primary queue...");
26-
allTasks.AddRange(MakeTasks());
29+
allTasks.AddRange(MakeTasks(true, QueueSettings.PrimaryQueueConsumers));
2730
Console.WriteLine("Start dequeue tasks for secondary queue...");
28-
allTasks.AddRange(MakeTasks());
31+
allTasks.AddRange(MakeTasks(false, QueueSettings.SecondaryQueueConsumers));
2932
await Task.WhenAll(allTasks);
3033

3134
// Recover from restart, if necessary.
3235
QueueAdapter.Make().Recover();
3336
QueueAdapter.Make(true).Recover();
3437
}
3538

36-
public IList<Task> MakeTasks()
39+
IList<Task> MakeTasks(bool useSecondaryQueue = false, int numberOfTasks = 0)
3740
{
38-
return Enumerable.Range(0, TaskCount).Select((i) => {
41+
return Enumerable.Range(0, numberOfTasks).Select((i) => {
3942
Console.WriteLine($"Registering Dequeuer #{i}");
4043
return Task.Factory.StartNew(() => {
41-
var queue = QueueAdapter.Make();
44+
var queue = QueueAdapter.Make(useSecondaryQueue, QueueSettings);
4245
queue.Id = i;
4346
queueAdapters.Add(queue);
4447
queue.Consume(CancellationTokenSource.Token);

src/InEngine.Core/Queuing/QueueAdapter.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ public class QueueAdapter : IQueueClient
1616
public string QueueName { get => QueueClient.QueueName; set => QueueClient.QueueName = value; }
1717
public bool UseCompression { get => QueueClient.UseCompression; set => QueueClient.UseCompression = value; }
1818

19-
public static QueueAdapter Make(bool useSecondaryQueue = false)
19+
public static QueueAdapter Make(bool useSecondaryQueue = false, QueueSettings queueSettings = null)
2020
{
21-
var queueSettings = InEngineSettings.Make().Queue;
21+
if (queueSettings == null)
22+
queueSettings = InEngineSettings.Make().Queue;
2223
var queueDriverName = queueSettings.QueueDriver.ToLower();
2324
var queue = new QueueAdapter();
2425

src/InEngine/ServerHost.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,14 @@ namespace InEngine
99
public class ServerHost : IDisposable
1010
{
1111
public SuperScheduler SuperScheduler { get; set; }
12-
public Dequeue ConsumeServer { get; set; }
12+
public Dequeue Dequeue { get; set; }
1313

1414
public ServerHost()
1515
{
1616
SuperScheduler = new SuperScheduler();
1717
Common.Logging.LogManager.Adapter = new Common.Logging.Simple.ConsoleOutLoggerFactoryAdapter { Level = Common.Logging.LogLevel.Info };
1818
SuperScheduler.Initialize();
19-
ConsumeServer = new Dequeue() {
20-
TaskCount = 10
21-
};
19+
Dequeue = new Dequeue();
2220
}
2321

2422
public void Dispose()
@@ -34,7 +32,7 @@ public void Start()
3432

3533
public async void StartQueueServerAsync()
3634
{
37-
await ConsumeServer.StartAsync();
35+
await Dequeue.StartAsync();
3836
}
3937
}
4038
}

0 commit comments

Comments
 (0)