Skip to content

Commit 9f1131b

Browse files
committed
Merge branch 'feature/wait_for_queue_messages_with_redis'
2 parents a6c8581 + 437dd1f commit 9f1131b

File tree

15 files changed

+202
-29
lines changed

15 files changed

+202
-29
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System.Threading;
2+
using InEngine.Core;
3+
4+
namespace InEngine.Commands.Sample
5+
{
6+
public class Pause : AbstractCommand
7+
{
8+
public override void Run()
9+
{
10+
Thread.Sleep(3000);
11+
}
12+
}
13+
}

src/InEngine.Commands/Sample/SampleCommandsPlugin.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ namespace InEngine.Commands.Sample
55
{
66
public class SampleCommandsPlugin : AbstractPlugin
77
{
8+
[VerbOption("sample:pause", HelpText = "Pause for a few seconds.")]
9+
public Pause Pause { get; set; }
10+
811
[VerbOption("sample:show-progress", HelpText = "A sample command to demonstrate the progress bar.")]
912
public ShowProgress ShowProgress { get; set; }
1013

src/InEngine.Core/Queuing/Clients/FileClient.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22
using System.Collections.Generic;
33
using System.IO;
44
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
57
using InEngine.Core.Exceptions;
68
using InEngine.Core.Queuing.Message;
79

810
namespace InEngine.Core.Queuing.Clients
911
{
1012
public class FileClient : IQueueClient
1113
{
14+
public int Id { get; set; } = 0;
1215
public string QueueBaseName { get; set; }
1316
public string QueueName { get; set; }
1417
public bool UseCompression { get; set; }
@@ -58,6 +61,27 @@ public void Publish(AbstractCommand command)
5861
}
5962
}
6063

64+
public void Consume(CancellationToken cancellationToken)
65+
{
66+
try
67+
{
68+
while(true)
69+
{
70+
if (Consume() == null)
71+
Thread.Sleep(5000);
72+
cancellationToken.ThrowIfCancellationRequested();
73+
}
74+
}
75+
catch (OperationCanceledException)
76+
{
77+
return;
78+
}
79+
catch (Exception exception)
80+
{
81+
Console.WriteLine(exception.Message);
82+
}
83+
}
84+
6185
public ICommandEnvelope Consume()
6286
{
6387
var fileInfo = new DirectoryInfo(PendingQueuePath)
@@ -117,6 +141,9 @@ public void RepublishFailedMessages()
117141
.ForEach(x => x.MoveTo(Path.Combine(PendingQueuePath, x.Name)));
118142
}
119143

144+
public void Recover()
145+
{}
146+
120147
public long GetFailedQueueLength()
121148
{
122149
return new DirectoryInfo(FailedQueuePath).GetFiles().LongCount();

src/InEngine.Core/Queuing/Clients/RedisClient.cs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
46
using InEngine.Core.Exceptions;
57
using InEngine.Core.Queuing.Message;
68
using StackExchange.Redis;
@@ -9,8 +11,10 @@ namespace InEngine.Core.Queuing.Clients
911
{
1012
public class RedisClient : IQueueClient
1113
{
14+
public int Id { get; set; } = 0;
1215
public string QueueBaseName { get; set; } = "InEngineQueue";
1316
public string QueueName { get; set; } = "Primary";
17+
public string RecoveryQueueName { get { return QueueBaseName + $":{QueueName}:Recovery"; } }
1418
public string PendingQueueName { get { return QueueBaseName + $":{QueueName}:Pending"; } }
1519
public string InProgressQueueName { get { return QueueBaseName + $":{QueueName}:InProgress"; } }
1620
public string FailedQueueName { get { return QueueBaseName + $":{QueueName}:Failed"; } }
@@ -29,8 +33,17 @@ public class RedisClient : IQueueClient
2933
public bool UseCompression { get; set; }
3034
public int RedisDb { get; set; }
3135

36+
public RedisChannel RedisChannel { get; set; }
37+
38+
public void InitChannel()
39+
{
40+
if (RedisChannel.IsNullOrEmpty)
41+
RedisChannel = new RedisChannel(QueueBaseName, RedisChannel.PatternMode.Auto);
42+
}
43+
3244
public void Publish(AbstractCommand command)
3345
{
46+
InitChannel();
3447
Redis.ListLeftPush(
3548
PendingQueueName,
3649
new CommandEnvelope() {
@@ -40,6 +53,39 @@ public void Publish(AbstractCommand command)
4053
SerializedCommand = command.SerializeToJson(UseCompression)
4154
}.SerializeToJson()
4255
);
56+
57+
var count = PublishToChannel($"published command: {command.Name}");
58+
}
59+
60+
public long PublishToChannel(string message = "published command.")
61+
{
62+
InitChannel();
63+
return Connection.GetSubscriber().Publish(RedisChannel, message);
64+
}
65+
66+
public void Recover()
67+
{
68+
for (var i = 0; i < Redis.ListLength(PendingQueueName); i++)
69+
PublishToChannel();
70+
}
71+
72+
public void Consume(CancellationToken cancellationToken)
73+
{
74+
try
75+
{
76+
InitChannel();
77+
Connection.GetSubscriber().Subscribe(RedisChannel, delegate {
78+
Task.Factory.StartNew(Consume, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
79+
});
80+
}
81+
catch (OperationCanceledException)
82+
{
83+
return;
84+
}
85+
catch (Exception exception)
86+
{
87+
Console.WriteLine(exception.Message);
88+
}
4389
}
4490

4591
public ICommandEnvelope Consume()
@@ -51,7 +97,7 @@ public ICommandEnvelope Consume()
5197
var commandEnvelope = serializedMessage.DeserializeFromJson<CommandEnvelope>();
5298
if (commandEnvelope == null)
5399
throw new CommandFailedException("Could not deserialize the command.");
54-
100+
55101
var command = QueueAdapter.ExtractCommandInstanceFromMessage(commandEnvelope);
56102
command.CommandLifeCycle.IncrementRetry();
57103
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
@@ -65,9 +111,10 @@ public ICommandEnvelope Consume()
65111
Redis.ListRemove(InProgressQueueName, serializedMessage, 1);
66112
if (command.CommandLifeCycle.ShouldRetry())
67113
Redis.ListLeftPush(PendingQueueName, commandEnvelope.SerializeToJson());
68-
else {
114+
else
115+
{
69116
Redis.ListLeftPush(FailedQueueName, commandEnvelope.SerializeToJson());
70-
throw new CommandFailedException("Failed to run consumed command.", exception);
117+
throw new CommandFailedException("Failed to run consumed command.", exception);
71118
}
72119
}
73120

src/InEngine.Core/Queuing/Clients/SyncClient.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using InEngine.Core.Queuing.Message;
45

56
namespace InEngine.Core.Queuing.Clients
67
{
78
public class SyncClient : IQueueClient
89
{
10+
public int Id { get; set; } = 0;
911
public string QueueBaseName { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
1012
public string QueueName { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
1113
public bool UseCompression { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
@@ -15,6 +17,14 @@ public void Publish(AbstractCommand command)
1517
command.Run();
1618
}
1719

20+
public void Recover()
21+
{}
22+
23+
public void Consume(CancellationToken cancellationToken)
24+
{
25+
throw new NotImplementedException();
26+
}
27+
1828
public ICommandEnvelope Consume()
1929
{
2030
throw new NotImplementedException();
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace InEngine.Core.Queuing
8+
{
9+
public class Dequeue
10+
{
11+
IList<QueueAdapter> queueAdapters;
12+
public CancellationTokenSource CancellationTokenSource { get; set; }
13+
public int TaskCount { get; set; }
14+
15+
public Dequeue()
16+
{
17+
queueAdapters = new List<QueueAdapter>();
18+
CancellationTokenSource = new CancellationTokenSource();
19+
}
20+
21+
public async Task StartAsync()
22+
{
23+
// Create dequeue tasks for primary and secondary queues.
24+
var allTasks = new List<Task>();
25+
Console.WriteLine("Start dequeue tasks for primary queue...");
26+
allTasks.AddRange(MakeTasks());
27+
Console.WriteLine("Start dequeue tasks for secondary queue...");
28+
allTasks.AddRange(MakeTasks());
29+
await Task.WhenAll(allTasks);
30+
31+
// Recover from restart, if necessary.
32+
QueueAdapter.Make().Recover();
33+
QueueAdapter.Make(true).Recover();
34+
}
35+
36+
public IList<Task> MakeTasks()
37+
{
38+
return Enumerable.Range(0, TaskCount).Select((i) => {
39+
Console.WriteLine($"Registering Dequeuer #{i}");
40+
return Task.Factory.StartNew(() => {
41+
var queue = QueueAdapter.Make();
42+
queue.Id = i;
43+
queueAdapters.Add(queue);
44+
queue.Consume(CancellationTokenSource.Token);
45+
}, TaskCreationOptions.LongRunning);
46+
}).ToList();
47+
}
48+
49+
public void Dispose()
50+
{
51+
CancellationTokenSource.Cancel();
52+
}
53+
}
54+
}

src/InEngine.Core/Queuing/IQueueClient.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
using System.Collections.Generic;
2+
using System.Threading;
23
using InEngine.Core.Queuing.Message;
34

45
namespace InEngine.Core.Queuing
56
{
67
public interface IQueueClient
78
{
9+
int Id { get; set; }
810
string QueueBaseName { get; set; }
911
string QueueName { get; set; }
1012
bool UseCompression { get; set; }
1113
void Publish(AbstractCommand command);
14+
void Consume(CancellationToken cancellationToken);
1215
ICommandEnvelope Consume();
16+
void Recover();
1317
long GetPendingQueueLength();
1418
long GetInProgressQueueLength();
1519
long GetFailedQueueLength();

src/InEngine.Core/Queuing/QueueAdapter.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Reflection;
4+
using System.Threading;
45
using InEngine.Core.Exceptions;
56
using InEngine.Core.Queuing.Clients;
67
using InEngine.Core.Queuing.Message;
@@ -9,6 +10,7 @@ namespace InEngine.Core.Queuing
910
{
1011
public class QueueAdapter : IQueueClient
1112
{
13+
public int Id { get { return QueueClient.Id; } set { QueueClient.Id = value; } }
1214
public IQueueClient QueueClient { get; set; }
1315
public string QueueBaseName { get => QueueClient.QueueBaseName; set => QueueClient.QueueBaseName = value; }
1416
public string QueueName { get => QueueClient.QueueName; set => QueueClient.QueueName = value; }
@@ -46,11 +48,21 @@ public void Publish(AbstractCommand command)
4648
QueueClient.Publish(command);
4749
}
4850

51+
public void Consume(CancellationToken CancellationToken)
52+
{
53+
QueueClient.Consume(CancellationToken);
54+
}
55+
4956
public ICommandEnvelope Consume()
5057
{
5158
return QueueClient.Consume();
5259
}
5360

61+
public void Recover()
62+
{
63+
QueueClient.Recover();
64+
}
65+
5466
public static AbstractCommand ExtractCommandInstanceFromMessage(ICommandEnvelope commandEnvelope)
5567
{
5668
var commandType = PluginAssembly.LoadFrom(commandEnvelope.PluginName)

src/InEngine.Core/Queuing/QueuingPlugin.cs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,5 @@ public class QueuingPlugin : AbstractPlugin
2727
[VerbOption("queue:peek", HelpText = "Peek at messages in the primary or secondary queues.")]
2828
public Peek Peek { get; set; }
2929

30-
public override void Schedule(ISchedule schedule)
31-
{
32-
var queueSettings = InEngineSettings.Make().Queue;
33-
ScheduleQueueConsumerJobs(schedule, queueSettings.PrimaryQueueConsumers);
34-
ScheduleQueueConsumerJobs(schedule, queueSettings.SecondaryQueueConsumers, true);
35-
}
36-
37-
void ScheduleQueueConsumerJobs(ISchedule schedule, int consumers, bool useSecondaryQueue = false)
38-
{
39-
if (consumers < 0)
40-
throw new ArgumentOutOfRangeException(nameof(consumers), consumers, "The number of queue consumers must be 0 or greater.");
41-
42-
foreach (var index in Enumerable.Range(0, consumers).ToList())
43-
schedule.Command(new Consume() {
44-
ScheduleId = $"{(useSecondaryQueue ? "secondary" : "primary")}:{index.ToString()}",
45-
UseSecondaryQueue = useSecondaryQueue
46-
})
47-
.EverySecond();
48-
}
4930
}
5031
}

src/InEngine/ArgumentInterpreter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ public void Interpret(string[] args)
4747

4848
InEngineSettings.ConfigurationFile = options.ConfigurationFile;
4949

50-
if (options.ShouldRunScheduler)
50+
if (options.ShouldRunServer)
5151
{
5252
Write.Info(CliLogo);
53-
Write.Line("Starting the scheduler...").Newline();
54-
Program.RunScheduler();
53+
Write.Line("Starting...").Newline();
54+
Program.RunServer();
5555
ExitWithSuccess();
5656
}
5757

0 commit comments

Comments
 (0)