Skip to content

Commit 67429da

Browse files
committed
Add failed queue management commands
1 parent dabe180 commit 67429da

File tree

4 files changed

+63
-12
lines changed

4 files changed

+63
-12
lines changed

src/InEngine.Core/Queue/Broker.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,7 @@ public bool Consume(bool useSecondaryQueue = false)
8484
catch (Exception exception)
8585
{
8686
Redis.ListRemove(processingQueueName, serializedMessage, 1);
87-
Redis.ListLeftPush(
88-
failedQueueName,
89-
commandInstance.SerializeToJson()
90-
);
87+
Redis.ListLeftPush(failedQueueName, stageMessageTask);
9188
throw new CommandFailedException("Consumed command failed.", exception);
9289
}
9390

@@ -128,6 +125,11 @@ public bool ClearPrimaryProcessingQueue()
128125
{
129126
return Redis.KeyDelete(PrimaryProcessingQueueName);
130127
}
128+
129+
public bool ClearPrimaryFailedQueue()
130+
{
131+
return Redis.KeyDelete(PrimaryFailedQueueName);
132+
}
131133
#endregion
132134

133135
#region Secondary Queue Management Methods
@@ -155,6 +157,20 @@ public bool ClearSecondaryProcessingQueue()
155157
{
156158
return Redis.KeyDelete(SecondaryProcessingQueueName);
157159
}
160+
161+
162+
public bool ClearSecondaryFailedQueue()
163+
{
164+
return Redis.KeyDelete(SecondaryFailedQueueName);
165+
}
158166
#endregion
167+
168+
public void RepublishFailedMessages(bool useSecondaryQueue)
169+
{
170+
Redis.ListRightPopLeftPush(
171+
useSecondaryQueue ? SecondaryFailedQueueName : PrimaryFailedQueueName,
172+
useSecondaryQueue ? SecondaryWaitingQueueName : PrimaryWaitingQueueName
173+
);
174+
}
159175
}
160176
}

src/InEngine.Core/Queue/Commands/ClearAll.cs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ namespace InEngine.Core.Queue.Commands
55
{
66
public class ClearAll : AbstractCommand
77
{
8+
[Option("failed-queue", HelpText = "Clear the failed queue.")]
9+
public bool ClearFailedQueue { get; set; }
10+
811
[Option("processing-queue", HelpText = "Clear the processing queue.")]
912
public bool ClearProcessingQueue { get; set; }
1013

@@ -15,13 +18,19 @@ public override void Run()
1518
{
1619
var broker = Broker.Make();
1720
if (UseSecondaryQueue) {
18-
Console.WriteLine(ClearProcessingQueue ?
19-
broker.ClearSecondaryProcessingQueue() :
20-
broker.ClearSecondaryWaitingQueue());
21+
if (ClearProcessingQueue)
22+
Info(broker.ClearSecondaryProcessingQueue().ToString());
23+
else if (ClearFailedQueue)
24+
Info(broker.ClearSecondaryFailedQueue().ToString());
25+
else
26+
Info(broker.ClearSecondaryWaitingQueue().ToString());
2127
} else {
22-
Console.WriteLine(ClearProcessingQueue ?
23-
broker.ClearPrimaryProcessingQueue() :
24-
broker.ClearPrimaryWaitingQueue());
28+
if (ClearProcessingQueue)
29+
Info(broker.ClearPrimaryProcessingQueue().ToString());
30+
else if (ClearFailedQueue)
31+
Info(broker.ClearPrimaryFailedQueue().ToString());
32+
else
33+
Info(broker.ClearPrimaryWaitingQueue().ToString());
2534
}
2635
}
2736
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using System;
2+
using System.Linq;
3+
using CommandLine;
4+
5+
namespace InEngine.Core.Queue.Commands
6+
{
7+
public class RepublishFailed : AbstractCommand
8+
{
9+
[Option("count", DefaultValue = 3, HelpText = "The maximum number of messages to republish.")]
10+
public int Count { get; set; }
11+
12+
[Option("secondary", DefaultValue = false, HelpText = "Republish failed secondary queue messages.")]
13+
public bool UseSecondaryQueue { get; set; }
14+
15+
public override void Run()
16+
{
17+
var broker = Broker.Make();
18+
Enumerable.Range(0, Count)
19+
.ToList()
20+
.ForEach(x => broker.RepublishFailedMessages(UseSecondaryQueue));
21+
}
22+
}
23+
}

src/InEngine.Core/Queue/Options.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ public class Options : IOptions
1515
[VerbOption("queue:length", HelpText = "Get the number of messages in the primary and secondary queues.")]
1616
public Length Length { get; set; }
1717

18-
[VerbOption("queue:clear", HelpText = "Clear the primary and secondary queues.")]
19-
public ClearAll QueueClearAll { get; set; }
18+
[VerbOption("queue:clear", HelpText = "Clear the primary or secondary queues, and optionally.")]
19+
public ClearAll ClearAll { get; set; }
20+
21+
[VerbOption("queue:republish", HelpText = "Republish failed messages to the queue.")]
22+
public RepublishFailed RepublishFailed { get; set; }
2023

2124
[HelpVerbOption]
2225
public string GetUsage(string verb)

0 commit comments

Comments
 (0)