Skip to content

Commit 8249eab

Browse files
committed
Refactor broker
1 parent 67429da commit 8249eab

File tree

6 files changed

+63
-108
lines changed

6 files changed

+63
-108
lines changed

src/InEngine.Core/Queue/Broker.cs

Lines changed: 38 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@ namespace InEngine.Core.Queue
1010
public class Broker
1111
{
1212
public string QueueBaseName { get; set; } = "InEngine:Queue";
13-
public string PrimaryWaitingQueueName { get { return QueueBaseName + ":Primary:Waiting"; } }
14-
public string PrimaryProcessingQueueName { get { return QueueBaseName + ":Primary:Processing"; } }
15-
public string PrimaryFailedQueueName { get { return QueueBaseName + ":Primary:Failed"; } }
16-
public string SecondaryWaitingQueueName { get { return QueueBaseName + ":Secondary:Waiting"; } }
17-
public string SecondaryProcessingQueueName { get { return QueueBaseName + ":Secondary:Processing"; } }
18-
public string SecondaryFailedQueueName { get { return QueueBaseName + ":Secondary:Failed"; } }
13+
public string QueueName { get; internal set; } = "Primary";
14+
public string PendingQueueName { get { return QueueBaseName + $":{QueueName}:Pending"; } }
15+
public string InProgressQueueName { get { return QueueBaseName + $":{QueueName}:InProgress"; } }
16+
public string FailedQueueName { get { return QueueBaseName + $":{QueueName}:Failed"; } }
1917
public static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() => {
2018
var queueSettings = InEngineSettings.Make().Queue;
2119
var redisConfig = ConfigurationOptions.Parse($"{queueSettings.RedisHost}:{queueSettings.RedisPort}");
@@ -39,17 +37,26 @@ public IDatabase Redis
3937
public static int RedisPort { get; set; }
4038
public static string RedisPassword { get; set; }
4139

42-
public static Broker Make()
40+
public Broker()
41+
{}
42+
43+
public Broker(bool useSecondaryQueue) : this()
44+
{
45+
if (useSecondaryQueue)
46+
QueueName = "Secondary";
47+
}
48+
49+
public static Broker Make(bool useSecondaryQueue = false)
4350
{
44-
return new Broker() {
51+
return new Broker(useSecondaryQueue) {
4552
QueueBaseName = InEngineSettings.Make().Queue.QueueName
4653
};
4754
}
4855

49-
public void Publish(ICommand command, bool useSecondaryQueue = false)
56+
public void Publish(ICommand command)
5057
{
5158
Redis.ListLeftPush(
52-
useSecondaryQueue ? SecondaryWaitingQueueName : PrimaryWaitingQueueName,
59+
PendingQueueName,
5360
new Message() {
5461
CommandClassName = command.GetType().FullName,
5562
CommandAssemblyName = command.GetType().Assembly.GetName().Name + ".dll",
@@ -58,13 +65,9 @@ public void Publish(ICommand command, bool useSecondaryQueue = false)
5865
);
5966
}
6067

61-
public bool Consume(bool useSecondaryQueue = false)
68+
public bool Consume()
6269
{
63-
var waitingQueueName = useSecondaryQueue ? SecondaryWaitingQueueName : PrimaryWaitingQueueName;
64-
var processingQueueName = useSecondaryQueue ? SecondaryProcessingQueueName : PrimaryProcessingQueueName;
65-
var failedQueueName = useSecondaryQueue ? SecondaryFailedQueueName : PrimaryFailedQueueName;
66-
67-
var stageMessageTask = Redis.ListRightPopLeftPush(waitingQueueName, processingQueueName);
70+
var stageMessageTask = Redis.ListRightPopLeftPush(PendingQueueName, InProgressQueueName);
6871
var serializedMessage = stageMessageTask.ToString();
6972
if (serializedMessage == null)
7073
return false;
@@ -83,94 +86,58 @@ public bool Consume(bool useSecondaryQueue = false)
8386
}
8487
catch (Exception exception)
8588
{
86-
Redis.ListRemove(processingQueueName, serializedMessage, 1);
87-
Redis.ListLeftPush(failedQueueName, stageMessageTask);
89+
Redis.ListRemove(InProgressQueueName, serializedMessage, 1);
90+
Redis.ListLeftPush(FailedQueueName, stageMessageTask);
8891
throw new CommandFailedException("Consumed command failed.", exception);
8992
}
9093

9194
try
9295
{
93-
Redis.ListRemove(processingQueueName, serializedMessage, 1);
96+
Redis.ListRemove(InProgressQueueName, serializedMessage, 1);
9497
}
9598
catch (Exception exception)
9699
{
97-
throw new CommandFailedException($"Failed to remove completed message from queue: {processingQueueName}", exception);
100+
throw new CommandFailedException($"Failed to remove completed message from queue: {InProgressQueueName}", exception);
98101
}
99102

100103
return true;
101104
}
102105

103-
#region Primary Queue Management Methods
104-
public long GetPrimaryWaitingQueueLength()
105-
{
106-
return Redis.ListLength(PrimaryWaitingQueueName);
107-
}
108-
109-
public long GetPrimaryProcessingQueueLength()
110-
{
111-
return Redis.ListLength(PrimaryProcessingQueueName);
112-
}
113-
114-
public long GetPrimaryFailedQueueLength()
115-
{
116-
return Redis.ListLength(PrimaryFailedQueueName);
117-
}
118-
119-
public bool ClearPrimaryWaitingQueue()
106+
#region Queue Management Methods
107+
public long GetPendingQueueLength()
120108
{
121-
return Redis.KeyDelete(PrimaryWaitingQueueName);
109+
return Redis.ListLength(PendingQueueName);
122110
}
123111

124-
public bool ClearPrimaryProcessingQueue()
112+
public long GetInProgressQueueLength()
125113
{
126-
return Redis.KeyDelete(PrimaryProcessingQueueName);
114+
return Redis.ListLength(InProgressQueueName);
127115
}
128116

129-
public bool ClearPrimaryFailedQueue()
117+
public long GetFailedQueueLength()
130118
{
131-
return Redis.KeyDelete(PrimaryFailedQueueName);
119+
return Redis.ListLength(FailedQueueName);
132120
}
133-
#endregion
134121

135-
#region Secondary Queue Management Methods
136-
public long GetSecondaryWaitingQueueLength()
122+
public bool ClearPendingQueue()
137123
{
138-
return Redis.ListLength(SecondaryWaitingQueueName);
124+
return Redis.KeyDelete(PendingQueueName);
139125
}
140126

141-
public long GetSecondaryProcessingQueueLength()
127+
public bool ClearInProgressQueue()
142128
{
143-
return Redis.ListLength(SecondaryProcessingQueueName);
129+
return Redis.KeyDelete(InProgressQueueName);
144130
}
145131

146-
public long GetSecondaryFailedQueueLength()
132+
public bool ClearFailedQueue()
147133
{
148-
return Redis.ListLength(SecondaryFailedQueueName);
134+
return Redis.KeyDelete(FailedQueueName);
149135
}
150136

151-
public bool ClearSecondaryWaitingQueue()
137+
public void RepublishFailedMessages()
152138
{
153-
return Redis.KeyDelete(SecondaryWaitingQueueName);
154-
}
155-
156-
public bool ClearSecondaryProcessingQueue()
157-
{
158-
return Redis.KeyDelete(SecondaryProcessingQueueName);
159-
}
160-
161-
162-
public bool ClearSecondaryFailedQueue()
163-
{
164-
return Redis.KeyDelete(SecondaryFailedQueueName);
139+
Redis.ListRightPopLeftPush(FailedQueueName, PendingQueueName);
165140
}
166141
#endregion
167-
168-
public void RepublishFailedMessages(bool useSecondaryQueue)
169-
{
170-
Redis.ListRightPopLeftPush(
171-
useSecondaryQueue ? SecondaryFailedQueueName : PrimaryFailedQueueName,
172-
useSecondaryQueue ? SecondaryWaitingQueueName : PrimaryWaitingQueueName
173-
);
174-
}
175142
}
176143
}

src/InEngine.Core/Queue/Commands/ClearAll.cs

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,24 @@ namespace InEngine.Core.Queue.Commands
55
{
66
public class ClearAll : AbstractCommand
77
{
8-
[Option("failed-queue", HelpText = "Clear the failed queue.")]
8+
[Option("failed", HelpText = "Clear the failed queue.")]
99
public bool ClearFailedQueue { get; set; }
1010

11-
[Option("processing-queue", HelpText = "Clear the processing queue.")]
11+
[Option("in-progress", HelpText = "Clear the processing queue.")]
1212
public bool ClearProcessingQueue { get; set; }
1313

1414
[Option("secondary", HelpText = "Clear the secondary queue.")]
1515
public bool UseSecondaryQueue { get; set; }
1616

1717
public override void Run()
1818
{
19-
var broker = Broker.Make();
20-
if (UseSecondaryQueue) {
21-
if (ClearProcessingQueue)
22-
Info(broker.ClearSecondaryProcessingQueue().ToString());
23-
else if (ClearFailedQueue)
24-
Info(broker.ClearSecondaryFailedQueue().ToString());
25-
else
26-
Info(broker.ClearSecondaryWaitingQueue().ToString());
27-
} else {
28-
if (ClearProcessingQueue)
29-
Info(broker.ClearPrimaryProcessingQueue().ToString());
30-
else if (ClearFailedQueue)
31-
Info(broker.ClearPrimaryFailedQueue().ToString());
32-
else
33-
Info(broker.ClearPrimaryWaitingQueue().ToString());
34-
}
19+
var broker = Broker.Make(UseSecondaryQueue);
20+
if (ClearProcessingQueue)
21+
Info(broker.ClearInProgressQueue().ToString());
22+
else if (ClearFailedQueue)
23+
Info(broker.ClearFailedQueue().ToString());
24+
else
25+
Info(broker.ClearPendingQueue().ToString());
3526
}
3627
}
3728
}

src/InEngine.Core/Queue/Commands/Consume.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ public class Consume : AbstractCommand
1515
public override void Run()
1616
{
1717
UseSecondaryQueue = UseSecondaryQueue || GetJobContextData<bool>("useSecondaryQueue");
18-
var broker = Broker.Make();
18+
var broker = Broker.Make(UseSecondaryQueue);
1919
var shouldConsume = true;
2020
while (shouldConsume)
21-
shouldConsume = broker.Consume(UseSecondaryQueue) && ShouldConsumeAll;
21+
shouldConsume = broker.Consume() && ShouldConsumeAll;
2222
}
2323

2424
public override void Failed(Exception exception)

src/InEngine.Core/Queue/Commands/Length.cs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,21 @@ public class Length : AbstractCommand
77
{
88
public override void Run()
99
{
10-
var broker = Broker.Make();
11-
var leftPadding = 15;
12-
Warning("Primary Queue:");
13-
InfoText("Pending".PadLeft(leftPadding));
14-
Line(broker.GetPrimaryWaitingQueueLength().ToString().PadLeft(10));
15-
InfoText("In-progress".PadLeft(leftPadding));
16-
Line(broker.GetPrimaryProcessingQueueLength().ToString().PadLeft(10));
17-
ErrorText("Failed".PadLeft(leftPadding));
18-
Line(broker.GetPrimaryFailedQueueLength().ToString().PadLeft(10));
19-
Newline();
10+
PrintUsage(Broker.Make());
11+
PrintUsage(Broker.Make(true));
12+
}
2013

21-
Warning("Secondary Queue:");
14+
public void PrintUsage(Broker broker)
15+
{
16+
var leftPadding = 15;
17+
Warning($"{broker.QueueName} Queue:");
18+
broker = Broker.Make(true);
2219
InfoText("Pending".PadLeft(leftPadding));
23-
Line(broker.GetSecondaryWaitingQueueLength().ToString().PadLeft(10));
20+
Line(broker.GetPendingQueueLength().ToString().PadLeft(10));
2421
InfoText("In-progress".PadLeft(leftPadding));
25-
Line(broker.GetSecondaryProcessingQueueLength().ToString().PadLeft(10));
22+
Line(broker.GetInProgressQueueLength().ToString().PadLeft(10));
2623
ErrorText("Failed".PadLeft(leftPadding));
27-
Line(broker.GetSecondaryFailedQueueLength().ToString().PadLeft(10));
24+
Line(broker.GetFailedQueueLength().ToString().PadLeft(10));
2825
Newline();
2926
}
3027
}

src/InEngine.Core/Queue/Commands/Publish.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public override void Run()
3030
if (Arguments != null)
3131
Parser.Default.ParseArguments(Arguments.ToList().Select(x => $"--{x}").ToArray(), command);
3232

33-
Broker.Make().Publish(command, UseSecondaryQueue);
33+
Broker.Make(UseSecondaryQueue).Publish(command);
3434
}
3535

3636
public override void Failed(Exception exception)

src/InEngine.Core/Queue/Commands/RepublishFailed.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ public class RepublishFailed : AbstractCommand
1414

1515
public override void Run()
1616
{
17-
var broker = Broker.Make();
17+
var broker = Broker.Make(UseSecondaryQueue);
1818
Enumerable.Range(0, Count)
1919
.ToList()
20-
.ForEach(x => broker.RepublishFailedMessages(UseSecondaryQueue));
20+
.ForEach(x => broker.RepublishFailedMessages());
2121
}
2222
}
2323
}

0 commit comments

Comments
 (0)