Skip to content

Commit 6ce5206

Browse files
committed
Add queue peek command
1 parent c3b09f9 commit 6ce5206

File tree

5 files changed

+96
-10
lines changed

5 files changed

+96
-10
lines changed

src/InEngine.Core/InEngine.Core.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
<PackageReference Include="CommandLineParser" Version="1.9.71" />
99
<PackageReference Include="NLog" Version="4.4.12" />
1010
<PackageReference Include="Quartz" Version="2.6.1" />
11-
<PackageReference Include="Goblinfactory.Konsole.ProgressBar.Core" Version="1.0.2" />
1211
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.0" />
1312
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.0" />
1413
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="2.0.0" />
1514
<PackageReference Include="StackExchange.Redis" Version="1.2.4" />
15+
<PackageReference Include="Goblinfactory.Konsole" Version="3.1.0" />
1616
</ItemGroup>
1717
<ItemGroup>
1818
<Compile Include="..\SharedAssemblyInfo.cs">

src/InEngine.Core/Queue/Broker.cs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
24
using System.Reflection;
35
using System.Threading.Tasks;
46
using InEngine.Core.Exceptions;
@@ -74,11 +76,7 @@ public bool Consume()
7476
var message = serializedMessage.DeserializeFromJson<Message>();
7577
if (message == null)
7678
return false;
77-
78-
var commandType = Type.GetType($"{message.CommandClassName}, {message.CommandAssemblyName}");
79-
if (commandType == null)
80-
throw new CommandFailedException("Consumed command failed: could not locate command type.");
81-
var commandInstance = JsonConvert.DeserializeObject(message.SerializedCommand, commandType) as ICommand;
79+
var commandInstance = ExtractCommandInstanceFromMessage(message);
8280

8381
try
8482
{
@@ -103,6 +101,14 @@ public bool Consume()
103101
return true;
104102
}
105103

104+
public ICommand ExtractCommandInstanceFromMessage(Message message)
105+
{
106+
var commandType = Type.GetType($"{message.CommandClassName}, {message.CommandAssemblyName}");
107+
if (commandType == null)
108+
throw new CommandFailedException("Could not locate command type.");
109+
return JsonConvert.DeserializeObject(message.SerializedCommand, commandType) as ICommand;
110+
}
111+
106112
#region Queue Management Methods
107113
public long GetPendingQueueLength()
108114
{
@@ -138,6 +144,26 @@ public void RepublishFailedMessages()
138144
{
139145
Redis.ListRightPopLeftPush(FailedQueueName, PendingQueueName);
140146
}
147+
148+
public List<Message> PeekPendingMessages(long from, long to)
149+
{
150+
return GetMessages(PendingQueueName, from, to);
151+
}
152+
153+
public List<Message> PeekInProgressMessages(long from, long to)
154+
{
155+
return GetMessages(InProgressQueueName, from, to);
156+
}
157+
158+
public List<Message> PeekFailedMessages(long from, long to)
159+
{
160+
return GetMessages(FailedQueueName, from, to);
161+
}
162+
163+
public List<Message> GetMessages(string queueName, long from, long to)
164+
{
165+
return Redis.ListRange(queueName, from, to).ToStringArray().Select(x => x.DeserializeFromJson<Message>()).ToList();
166+
}
141167
#endregion
142168
}
143169
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System;
2+
using CommandLine;
3+
using InEngine.Core.Exceptions;
4+
using Konsole;
5+
using Konsole.Forms;
6+
using Newtonsoft.Json;
7+
8+
namespace InEngine.Core.Queue.Commands
9+
{
10+
public class Peek : AbstractCommand
11+
{
12+
[Option("offset", DefaultValue = 0, HelpText = "The maximum number of messages to peek.")]
13+
public long Offset { get; set; }
14+
15+
[Option("limit", DefaultValue = 10, HelpText = "The maximum number of messages to peek.")]
16+
public long Limit { get; set; }
17+
18+
[Option("json", HelpText = "View the messages as JSON.")]
19+
public bool JsonFormat { get; set; }
20+
21+
[Option("pending", HelpText = "Peek at messages in the pending queue.")]
22+
public bool PendingQueue { get; set; }
23+
24+
[Option("failed", HelpText = "Peek at messages in the failed queue.")]
25+
public bool FailedQueue { get; set; }
26+
27+
[Option("in-progress", HelpText = "Peek at messages in the in-progress queue.")]
28+
public bool InProgressQueue { get; set; }
29+
30+
[Option("secondary", HelpText = "Peek at messages in secondary queues. Primary queues are used by default.")]
31+
public bool UseSecondaryQueue { get; set; }
32+
33+
public override void Run()
34+
{
35+
if (PendingQueue == false && FailedQueue == false && InProgressQueue == false)
36+
throw new CommandFailedException("Must specify at least one queue to peek in. Use -h to see available options.");
37+
var broker = Broker.Make(UseSecondaryQueue);
38+
var from = Offset;
39+
var to = Offset + Limit - 1;
40+
if (PendingQueue) {
41+
Warning("Pending:");
42+
var konsoleForm = new Form(120, new ThinBoxStyle());
43+
broker.PeekPendingMessages(from, to).ForEach(x => {
44+
var message = x as Message;
45+
if (JsonFormat)
46+
Line(message.SerializeToJson());
47+
else
48+
konsoleForm.Write(broker.ExtractCommandInstanceFromMessage(message));
49+
});
50+
}
51+
if (InProgressQueue)
52+
Info($"In-progress: {broker.PeekInProgressMessages(from, to).ToString()}");
53+
if (FailedQueue)
54+
Info($"Failed: {broker.PeekFailedMessages(from, to).ToString()}");
55+
}
56+
}
57+
}

src/InEngine.Core/Queue/Commands/RepublishFailed.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ namespace InEngine.Core.Queue.Commands
66
{
77
public class RepublishFailed : AbstractCommand
88
{
9-
[Option("count", DefaultValue = 3, HelpText = "The maximum number of messages to republish.")]
10-
public int Count { get; set; }
9+
[Option("limit", DefaultValue = 100, HelpText = "The maximum number of messages to republish.")]
10+
public int Limit { get; set; }
1111

1212
[Option("secondary", DefaultValue = false, HelpText = "Republish failed secondary queue messages.")]
1313
public bool UseSecondaryQueue { get; set; }
1414

1515
public override void Run()
1616
{
1717
var broker = Broker.Make(UseSecondaryQueue);
18-
Enumerable.Range(0, Count)
18+
Enumerable.Range(0, Limit)
1919
.ToList()
2020
.ForEach(x => broker.RepublishFailedMessages());
2121
}

src/InEngine.Core/Queue/Options.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@ public class Options : IOptions
1515
[VerbOption("queue:length", HelpText = "Get the number of messages in the primary and secondary queues.")]
1616
public Length Length { get; set; }
1717

18-
[VerbOption("queue:flush", HelpText = "Clear the primary or secondary queues, and optionally.")]
18+
[VerbOption("queue:flush", HelpText = "Clear the primary or secondary queues.")]
1919
public Flush Flush { get; set; }
2020

2121
[VerbOption("queue:republish", HelpText = "Republish failed messages to the queue.")]
2222
public RepublishFailed RepublishFailed { get; set; }
2323

24+
[VerbOption("queue:peek", HelpText = "Peek at messages in the primary or secondary queues.")]
25+
public Peek Peek { get; set; }
26+
2427
[HelpVerbOption]
2528
public string GetUsage(string verb)
2629
{

0 commit comments

Comments
 (0)