Skip to content

Commit 1cd76eb

Browse files
feat: add polling to get the number of active queues (#155)
* add polling to get the number of active queues * All three repositories have the feature implemented
1 parent 25f7a06 commit 1cd76eb

33 files changed

+566
-134
lines changed

KafkaFlow.Retry.sln

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ EndProject
3030
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{9557F908-472F-4872-BCF8-8EC028EFDA9B}"
3131
ProjectSection(SolutionItems) = preProject
3232
Directory.Build.props = Directory.Build.props
33+
README.md = README.md
3334
EndProjectSection
3435
EndProject
3536
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Retry.SchemaRegistry.Sample", "samples\KafkaFlow.Retry.SchemaRegistry.Sample\KafkaFlow.Retry.SchemaRegistry.Sample.csproj", "{510D65E8-B62C-402C-9CE3-47C7055A29FF}"
@@ -47,9 +48,6 @@ EndProject
4748
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{0192C262-63AF-4918-B142-EC07DBB9E501}"
4849
EndProject
4950
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{18F1AB11-9DC0-442F-9B6D-7098A93727B8}"
50-
ProjectSection(SolutionItems) = preProject
51-
CodeCoverage.runsettings = tests\CodeCoverage.runsettings
52-
EndProjectSection
5351
EndProject
5452
Global
5553
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -106,21 +104,21 @@ Global
106104
HideSolutionNode = FALSE
107105
EndGlobalSection
108106
GlobalSection(NestedProjects) = preSolution
107+
{5AE4E956-15A8-4117-9A9D-97B53060FE4C} = {0192C262-63AF-4918-B142-EC07DBB9E501}
108+
{1DF1AB0D-37CB-4AFC-B701-8F0F2B260E54} = {0192C262-63AF-4918-B142-EC07DBB9E501}
109+
{90B43DD5-C3CE-4BF8-B63E-3A34B962DC96} = {0192C262-63AF-4918-B142-EC07DBB9E501}
109110
{9DC50EFE-C511-4DEC-A8EA-199795CEFE01} = {1DF1AB0D-37CB-4AFC-B701-8F0F2B260E54}
110111
{745CB854-1FFE-4C60-AD15-69A3A784CC9F} = {B1D2A20A-0742-4D8E-A773-66EB95560152}
111112
{C61CCF7F-E7C8-4FEF-9E7E-22AB3ECD148D} = {90B43DD5-C3CE-4BF8-B63E-3A34B962DC96}
112113
{F06DD63E-8965-4C43-BEEE-4ABBB5914FF8} = {90B43DD5-C3CE-4BF8-B63E-3A34B962DC96}
113114
{D3664EBB-D77B-42C2-AF90-7B2F3E354C3F} = {5AE4E956-15A8-4117-9A9D-97B53060FE4C}
114115
{F27309CD-D796-425B-B5D6-780B7B57E9C7} = {B1D2A20A-0742-4D8E-A773-66EB95560152}
116+
{9E3B34BA-E309-4DA4-93D4-C0DD72D4711D} = {18F1AB11-9DC0-442F-9B6D-7098A93727B8}
117+
{A25A5E30-8D5A-40DB-BA21-7A5B4FB44DE0} = {18F1AB11-9DC0-442F-9B6D-7098A93727B8}
115118
{510D65E8-B62C-402C-9CE3-47C7055A29FF} = {B1D2A20A-0742-4D8E-A773-66EB95560152}
116119
{B14C5859-85C5-4E2F-80C7-D8B29E36481A} = {B1D2A20A-0742-4D8E-A773-66EB95560152}
117120
{2A0BC610-E0FE-4BC3-B232-A8B918BE7381} = {9557F908-472F-4872-BCF8-8EC028EFDA9B}
118121
{B7E4C23D-48DC-4056-8658-19D54AF7008A} = {90B43DD5-C3CE-4BF8-B63E-3A34B962DC96}
119-
{A25A5E30-8D5A-40DB-BA21-7A5B4FB44DE0} = {18F1AB11-9DC0-442F-9B6D-7098A93727B8}
120-
{9E3B34BA-E309-4DA4-93D4-C0DD72D4711D} = {18F1AB11-9DC0-442F-9B6D-7098A93727B8}
121-
{90B43DD5-C3CE-4BF8-B63E-3A34B962DC96} = {0192C262-63AF-4918-B142-EC07DBB9E501}
122-
{1DF1AB0D-37CB-4AFC-B701-8F0F2B260E54} = {0192C262-63AF-4918-B142-EC07DBB9E501}
123-
{5AE4E956-15A8-4117-9A9D-97B53060FE4C} = {0192C262-63AF-4918-B142-EC07DBB9E501}
124122
EndGlobalSection
125123
GlobalSection(ExtensibilityGlobals) = postSolution
126124
SolutionGuid = {A953E534-FBCA-4F30-9CA5-96F67C1A49D8}

README.md

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,9 @@ Read our [contributing guidelines](CONTRIBUTING.md) to learn about our developme
2828

2929
## Maintainers
3030

31-
- [Bruno Gomes](https://github.com/brunohfgomes)
32-
- [Carlos Miranda](https://github.com/carlosgoias)
33-
- [Fernando Marins](https://github.com/fernando-a-marins)
34-
- [Leandro Magalhães](https://github.com/spookylsm)
3531
- [Luís Garcês](https://github.com/luispfgarces)
3632
- [Martinho Novais](https://github.com/martinhonovais)
3733
- [Rodrigo Belo](https://github.com/rodrigobelo)
38-
- [Sérgio Ribeiro](https://github.com/sergioamribeiro)
3934

4035
## Get in touch
4136

samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoDb(
9292
.WithTimeToLiveInDays(60)
9393
.Enabled(true)
9494
)
95+
.WithRetryDurableActiveQueuesCountPollingConfiguration(
96+
configure => configure
97+
.Enabled(true)
98+
.WithCronExpression("0 0/1 * 1/1 * ? *")
99+
.Do((numberOfActiveQueues) =>
100+
{
101+
Console.Write($"Number of mongodb active queues {numberOfActiveQueues}");
102+
})
103+
)
95104
))
96105
.AddTypedHandlers(
97106
handlers => handlers
@@ -174,7 +183,18 @@ internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer(
174183
.WithCleanupPollingConfiguration(
175184
configure => configure
176185
.Enabled(false)
186+
.WithCronExpression("0 0/1 * 1/1 * ? *")
177187
)
188+
.WithRetryDurableActiveQueuesCountPollingConfiguration(
189+
configure => configure
190+
.Enabled(true)
191+
.WithCronExpression("0 0/1 * 1/1 * ? *")
192+
.Do((numberOfActiveQueues) =>
193+
{
194+
Console.Write($"Number of sql server active queues {numberOfActiveQueues}");
195+
})
196+
)
197+
178198
))
179199
.AddTypedHandlers(
180200
handlers => handlers

samples/KafkaFlow.Retry.Sample/Program.cs

Lines changed: 104 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,19 @@ private static async Task Main()
2121
var mongoDbDatabaseName = "kafka_flow_retry_durable_sample";
2222
var mongoDbRetryQueueCollectionName = "RetryQueues";
2323
var mongoDbRetryQueueItemCollectionName = "RetryQueueItems";
24-
var sqlServerConnectionString =
25-
"Server=localhost;Trusted_Connection=True; Pooling=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Sample";
24+
var sqlServerConnectionString = string.Join(
25+
string.Empty,
26+
"Server=localhost;",
27+
"Trusted_Connection=false;",
28+
"TrustServerCertificate=true;",
29+
"Integrated Security=false;",
30+
"Pooling=true;",
31+
"Min Pool Size=1;",
32+
"Max Pool Size=100;",
33+
"MultipleActiveResultSets=true;",
34+
"Application Name=KafkaFlow Retry Tests;",
35+
"Encrypt=false;"
36+
);
2637
var sqlServerDatabaseName = "kafka_flow_retry_durable_sample";
2738
var topics = new[]
2839
{
@@ -76,106 +87,106 @@ private static async Task Main()
7687
switch (input)
7788
{
7889
case "retry-durable-mongodb":
79-
{
80-
Console.Write("Number of the distinct messages to produce: ");
81-
int.TryParse(Console.ReadLine(), out var numOfMessages);
82-
Console.Write("Number of messages with same partition key: ");
83-
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);
84-
var messages = Enumerable
85-
.Range(0, numOfMessages)
86-
.SelectMany(
87-
x =>
88-
{
89-
var partitionKey = Guid.NewGuid().ToString();
90-
return Enumerable
91-
.Range(0, numOfMessagesWithSamePartitionkey)
92-
.Select(y => new BatchProduceItem(
93-
"sample-kafka-flow-retry-durable-mongodb-topic",
94-
partitionKey,
95-
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
96-
null))
97-
.ToList();
98-
}
99-
)
100-
.ToList();
101-
102-
await producers["kafka-flow-retry-durable-mongodb-producer"]
103-
.BatchProduceAsync(messages)
104-
.ConfigureAwait(false);
105-
Console.WriteLine("Published");
106-
}
90+
{
91+
Console.Write("Number of the distinct messages to produce: ");
92+
int.TryParse(Console.ReadLine(), out var numOfMessages);
93+
Console.Write("Number of messages with same partition key: ");
94+
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);
95+
var messages = Enumerable
96+
.Range(0, numOfMessages)
97+
.SelectMany(
98+
x =>
99+
{
100+
var partitionKey = Guid.NewGuid().ToString();
101+
return Enumerable
102+
.Range(0, numOfMessagesWithSamePartitionkey)
103+
.Select(y => new BatchProduceItem(
104+
"sample-kafka-flow-retry-durable-mongodb-topic",
105+
partitionKey,
106+
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
107+
null))
108+
.ToList();
109+
}
110+
)
111+
.ToList();
112+
113+
await producers["kafka-flow-retry-durable-mongodb-producer"]
114+
.BatchProduceAsync(messages)
115+
.ConfigureAwait(false);
116+
Console.WriteLine("Published");
117+
}
107118
break;
108119

109120
case "retry-durable-sqlserver":
110-
{
111-
Console.Write("Number of the distinct messages to produce: ");
112-
int.TryParse(Console.ReadLine(), out var numOfMessages);
113-
Console.Write("Number of messages with same partition key: ");
114-
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);
115-
116-
var messages = Enumerable
117-
.Range(0, numOfMessages)
118-
.SelectMany(
119-
x =>
120-
{
121-
var partitionKey = Guid.NewGuid().ToString();
122-
return Enumerable
123-
.Range(0, numOfMessagesWithSamePartitionkey)
124-
.Select(y => new BatchProduceItem(
125-
"sample-kafka-flow-retry-durable-sqlserver-topic",
126-
partitionKey,
127-
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
128-
null))
129-
.ToList();
130-
}
131-
)
132-
.ToList();
133-
134-
await producers["kafka-flow-retry-durable-sqlserver-producer"]
135-
.BatchProduceAsync(messages)
136-
.ConfigureAwait(false);
137-
Console.WriteLine("Published");
138-
}
121+
{
122+
Console.Write("Number of the distinct messages to produce: ");
123+
int.TryParse(Console.ReadLine(), out var numOfMessages);
124+
Console.Write("Number of messages with same partition key: ");
125+
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);
126+
127+
var messages = Enumerable
128+
.Range(0, numOfMessages)
129+
.SelectMany(
130+
x =>
131+
{
132+
var partitionKey = Guid.NewGuid().ToString();
133+
return Enumerable
134+
.Range(0, numOfMessagesWithSamePartitionkey)
135+
.Select(y => new BatchProduceItem(
136+
"sample-kafka-flow-retry-durable-sqlserver-topic",
137+
partitionKey,
138+
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
139+
null))
140+
.ToList();
141+
}
142+
)
143+
.ToList();
144+
145+
await producers["kafka-flow-retry-durable-sqlserver-producer"]
146+
.BatchProduceAsync(messages)
147+
.ConfigureAwait(false);
148+
Console.WriteLine("Published");
149+
}
139150
break;
140151

141152
case "retry-forever":
142-
{
143-
Console.Write("Number of messages to produce: ");
144-
int.TryParse(Console.ReadLine(), out var numOfMessages);
145-
await producers["kafka-flow-retry-forever-producer"]
146-
.BatchProduceAsync(
147-
Enumerable
148-
.Range(0, numOfMessages)
149-
.Select(
150-
x => new BatchProduceItem(
151-
"sample-kafka-flow-retry-forever-topic",
152-
"partition-key",
153-
new RetryForeverTestMessage { Text = $"Message({x}): {Guid.NewGuid()}" },
154-
null))
155-
.ToList())
156-
.ConfigureAwait(false);
157-
Console.WriteLine("Published");
158-
}
153+
{
154+
Console.Write("Number of messages to produce: ");
155+
int.TryParse(Console.ReadLine(), out var numOfMessages);
156+
await producers["kafka-flow-retry-forever-producer"]
157+
.BatchProduceAsync(
158+
Enumerable
159+
.Range(0, numOfMessages)
160+
.Select(
161+
x => new BatchProduceItem(
162+
"sample-kafka-flow-retry-forever-topic",
163+
"partition-key",
164+
new RetryForeverTestMessage { Text = $"Message({x}): {Guid.NewGuid()}" },
165+
null))
166+
.ToList())
167+
.ConfigureAwait(false);
168+
Console.WriteLine("Published");
169+
}
159170
break;
160171

161172
case "retry-simple":
162-
{
163-
Console.Write("Number of messages to produce:");
164-
int.TryParse(Console.ReadLine(), out var numOfMessages);
165-
await producers["kafka-flow-retry-simple-producer"]
166-
.BatchProduceAsync(
167-
Enumerable
168-
.Range(0, numOfMessages)
169-
.Select(
170-
x => new BatchProduceItem(
171-
"sample-kafka-flow-retry-simple-topic",
172-
"partition-key",
173-
new RetrySimpleTestMessage { Text = $"Message({x}): {Guid.NewGuid()}" },
174-
null))
175-
.ToList())
176-
.ConfigureAwait(false);
177-
Console.WriteLine("Published");
178-
}
173+
{
174+
Console.Write("Number of messages to produce:");
175+
int.TryParse(Console.ReadLine(), out var numOfMessages);
176+
await producers["kafka-flow-retry-simple-producer"]
177+
.BatchProduceAsync(
178+
Enumerable
179+
.Range(0, numOfMessages)
180+
.Select(
181+
x => new BatchProduceItem(
182+
"sample-kafka-flow-retry-simple-topic",
183+
"partition-key",
184+
new RetrySimpleTestMessage { Text = $"Message({x}): {Guid.NewGuid()}" },
185+
null))
186+
.ToList())
187+
.ConfigureAwait(false);
188+
Console.WriteLine("Published");
189+
}
179190
break;
180191

181192
case "exit":

src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueRepository.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ namespace KafkaFlow.Retry.MongoDb.Repositories;
1111

1212
internal interface IRetryQueueRepository
1313
{
14+
Task<long> CountQueuesAsync(string searchGroupKey, RetryQueueStatus status);
15+
1416
Task<DeleteQueuesResult> DeleteQueuesAsync(IEnumerable<Guid> queueIds);
1517

1618
Task<RetryQueueDbo> GetQueueAsync(string queueGroupKey);

src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ public RetryQueueRepository(DbContext dbContext)
2222
_dbContext = dbContext;
2323
}
2424

25+
public async Task<long> CountQueuesAsync(string searchGroupKey, RetryQueueStatus status)
26+
{
27+
var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters();
28+
29+
var findFilter = queuesFilterBuilder.Eq(q => q.SearchGroupKey, searchGroupKey)
30+
& queuesFilterBuilder.Eq(q => q.Status, status);
31+
32+
return await _dbContext.RetryQueues.CountDocumentsAsync(findFilter).ConfigureAwait(false);
33+
}
34+
2535
public async Task<DeleteQueuesResult> DeleteQueuesAsync(IEnumerable<Guid> queueIds)
2636
{
2737
var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters();

src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
105105
return new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems);
106106
}
107107

108+
public async Task<long> CountQueuesAsync(CountQueuesInput input)
109+
{
110+
Guard.Argument(input, nameof(input)).NotNull();
111+
112+
return await _retryQueueRepository
113+
.CountQueuesAsync(
114+
input.SearchGroupKey,
115+
input.Status)
116+
.ConfigureAwait(false);
117+
}
118+
108119
public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
109120
{
110121
Guard.Argument(input, nameof(input)).NotNull();

src/KafkaFlow.Retry.Postgres/Repositories/IRetryQueueRepository.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ namespace KafkaFlow.Retry.Postgres.Repositories;
1010
internal interface IRetryQueueRepository
1111
{
1212
Task<long> AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo);
13-
13+
14+
Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus);
15+
1416
Task<int> DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete);
1517

1618
Task<bool> ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey);

0 commit comments

Comments
 (0)