Skip to content

Commit 3d9f422

Browse files
committed
Rework queue length command
1 parent 5141e1e commit 3d9f422

File tree

7 files changed

+44
-121
lines changed

7 files changed

+44
-121
lines changed

src/InEngine.Core/Queuing/Clients/FileClient.cs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -151,21 +151,6 @@ public void RepublishFailedMessages()
151151
public void Recover()
152152
{}
153153

154-
public long GetFailedQueueLength()
155-
{
156-
return new DirectoryInfo(FailedQueuePath).GetFiles().LongCount();
157-
}
158-
159-
public long GetInProgressQueueLength()
160-
{
161-
return new DirectoryInfo(InProgressQueuePath).GetFiles().LongCount();
162-
}
163-
164-
public long GetPendingQueueLength()
165-
{
166-
return new DirectoryInfo(PendingQueuePath).GetFiles().LongCount();
167-
}
168-
169154
public bool ClearFailedQueue()
170155
{
171156
return ClearQueue(FailedQueuePath);
@@ -221,5 +206,14 @@ public List<ICommandEnvelope> PeekMessages(string queuePath, long from, long to)
221206
.Select(x => File.ReadAllText(x.FullName).DeserializeFromJson<CommandEnvelope>() as ICommandEnvelope)
222207
.ToList();
223208
}
209+
210+
public Dictionary<string, long> GetQueueLengths()
211+
{
212+
return new Dictionary<string, long>() {
213+
{"Pending", new DirectoryInfo(PendingQueuePath).GetFiles().LongCount()},
214+
{"In-progress", new DirectoryInfo(InProgressQueuePath).GetFiles().LongCount()},
215+
{"Failed", new DirectoryInfo(FailedQueuePath).GetFiles().LongCount()}
216+
};
217+
}
224218
}
225219
}

src/InEngine.Core/Queuing/Clients/RabbitMQClient.cs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -170,29 +170,12 @@ public bool ClearFailedQueue()
170170
return Channel.QueuePurge(FailedQueueName) > 0;
171171
}
172172

173-
public long GetPendingQueueLength()
174-
{
175-
InitChannel();
176-
return Channel.MessageCount(PendingQueueName);
177-
}
178-
179-
public long GetFailedQueueLength()
180-
{
181-
InitChannel();
182-
return Channel.MessageCount(FailedQueueName);
183-
}
184-
185173
#region Not implemented
186174
public bool ClearInProgressQueue()
187175
{
188176
throw new NotImplementedException();
189177
}
190178

191-
public long GetInProgressQueueLength()
192-
{
193-
throw new NotImplementedException();
194-
}
195-
196179
public List<ICommandEnvelope> PeekFailedMessages(long from, long to)
197180
{
198181
throw new NotImplementedException();
@@ -220,5 +203,14 @@ public void Dispose()
220203
if (Connection != null && Connection.IsOpen)
221204
Connection.Close();
222205
}
206+
207+
public Dictionary<string, long> GetQueueLengths()
208+
{
209+
InitChannel();
210+
return new Dictionary<string, long>() {
211+
{"Pending", Channel.MessageCount(PendingQueueName)},
212+
{"Failed", Channel.MessageCount(FailedQueueName)}
213+
};
214+
}
223215
}
224216
}

src/InEngine.Core/Queuing/Clients/RedisClient.cs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ public class RedisClient : IQueueClient
3434
public ConnectionMultiplexer _connectionMultiplexer;
3535
public IDatabase Redis { get { return Connection.GetDatabase(ClientSettings.Database); } }
3636
public bool UseCompression { get; set; }
37-
3837
public RedisChannel RedisChannel { get; set; }
3938

4039
public void InitChannel()
@@ -135,21 +134,6 @@ public ICommandEnvelope Consume()
135134
return commandEnvelope;
136135
}
137136

138-
public long GetPendingQueueLength()
139-
{
140-
return Redis.ListLength(PendingQueueName);
141-
}
142-
143-
public long GetInProgressQueueLength()
144-
{
145-
return Redis.ListLength(InProgressQueueName);
146-
}
147-
148-
public long GetFailedQueueLength()
149-
{
150-
return Redis.ListLength(FailedQueueName);
151-
}
152-
153137
public bool ClearPendingQueue()
154138
{
155139
return Redis.KeyDelete(PendingQueueName);
@@ -191,5 +175,14 @@ public List<ICommandEnvelope> GetMessages(string queueName, long from, long to)
191175
.ToStringArray()
192176
.Select(x => x.DeserializeFromJson<CommandEnvelope>() as ICommandEnvelope).ToList();
193177
}
178+
179+
public Dictionary<string, long> GetQueueLengths()
180+
{
181+
return new Dictionary<string, long> {
182+
{"Pending", Redis.ListLength(PendingQueueName)},
183+
{"In-progress", Redis.ListLength(InProgressQueueName)},
184+
{"Failed", Redis.ListLength(FailedQueueName)}
185+
};
186+
}
194187
}
195188
}

src/InEngine.Core/Queuing/Clients/SyncClient.cs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,6 @@ public bool ClearPendingQueue()
4747
throw new NotImplementedException();
4848
}
4949

50-
public long GetFailedQueueLength()
51-
{
52-
throw new NotImplementedException();
53-
}
54-
55-
public long GetInProgressQueueLength()
56-
{
57-
throw new NotImplementedException();
58-
}
59-
60-
public long GetPendingQueueLength()
61-
{
62-
throw new NotImplementedException();
63-
}
64-
6550
public List<ICommandEnvelope> PeekFailedMessages(long from, long to)
6651
{
6752
throw new NotImplementedException();
@@ -81,5 +66,10 @@ public void RepublishFailedMessages()
8166
{
8267
throw new NotImplementedException();
8368
}
69+
70+
public Dictionary<string, long> GetQueueLengths()
71+
{
72+
return new Dictionary<string, long>();
73+
}
8474
}
8575
}
Lines changed: 6 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System.Linq;
22

33
namespace InEngine.Core.Queuing.Commands
44
{
@@ -12,46 +12,12 @@ public override void Run()
1212

1313
public void PrintQueueLengths(QueueAdapter queue)
1414
{
15-
var textLeftPadding = 15;
16-
var numberLeftPadding = 2;
1715
Warning($"{queue.QueueName} Queue:");
18-
19-
try
20-
{
21-
InfoText("".PadLeft(numberLeftPadding));
22-
InfoText("Pending");
23-
InfoText("".PadLeft(textLeftPadding));
24-
Line(queue.GetPendingQueueLength().ToString());
25-
}
26-
catch (NotImplementedException)
27-
{
28-
Error("Not supported by queue client.");
29-
}
30-
31-
try
32-
{
33-
InfoText("".PadLeft(numberLeftPadding));
34-
InfoText("In-progress");
35-
InfoText("".PadLeft(textLeftPadding));
36-
Line(queue.GetInProgressQueueLength().ToString());
37-
}
38-
catch (NotImplementedException)
39-
{
40-
Error("Not supported by queue client.");
41-
}
42-
43-
try
44-
{
45-
InfoText("".PadLeft(numberLeftPadding));
46-
InfoText("Failed");
47-
InfoText("".PadLeft(textLeftPadding));
48-
Line(queue.GetFailedQueueLength().ToString());
49-
}
50-
catch (NotImplementedException)
51-
{
52-
Error("Not supported by queue client.");
53-
}
16+
queue.GetQueueLengths().ToList().ForEach(x => {
17+
InfoText(x.Key.PadLeft(15));
18+
Line(x.Value.ToString().PadLeft(10));
19+
});
5420
Newline();
5521
}
5622
}
57-
}
23+
}

src/InEngine.Core/Queuing/IQueueClient.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ public interface IQueueClient
1616
void Consume(CancellationToken cancellationToken);
1717
ICommandEnvelope Consume();
1818
void Recover();
19-
long GetPendingQueueLength();
20-
long GetInProgressQueueLength();
21-
long GetFailedQueueLength();
19+
Dictionary<string, long> GetQueueLengths();
2220
bool ClearPendingQueue();
2321
bool ClearInProgressQueue();
2422
bool ClearFailedQueue();

src/InEngine.Core/Queuing/QueueAdapter.cs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,21 +73,6 @@ public void Recover()
7373
QueueClient.Recover();
7474
}
7575

76-
public long GetPendingQueueLength()
77-
{
78-
return QueueClient.GetPendingQueueLength();
79-
}
80-
81-
public long GetInProgressQueueLength()
82-
{
83-
return QueueClient.GetInProgressQueueLength();
84-
}
85-
86-
public long GetFailedQueueLength()
87-
{
88-
return QueueClient.GetFailedQueueLength();
89-
}
90-
9176
public bool ClearPendingQueue()
9277
{
9378
return QueueClient.ClearPendingQueue();
@@ -122,5 +107,10 @@ public List<ICommandEnvelope> PeekFailedMessages(long from, long to)
122107
{
123108
return QueueClient.PeekFailedMessages(from, to);
124109
}
110+
111+
public Dictionary<string, long> GetQueueLengths()
112+
{
113+
return QueueClient.GetQueueLengths();
114+
}
125115
}
126116
}

0 commit comments

Comments
 (0)