Skip to content
Open

Queue #111

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/QueueFlagTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Queuing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Shouldly;

namespace Cleipnir.ResilientFunctions.Tests.UtilsTests;

[TestClass]
public class QueueFlagTests
{
[TestMethod]
public async Task RaiseBeforeWaitCompletesImmediately()
{
var flag = new QueueFlag();

flag.Raise();
var task = flag.WaitForRaised();

task.IsCompleted.ShouldBeTrue();
await task;
}

[TestMethod]
public async Task WaitBeforeRaiseCompletesAfterRaise()
{
var flag = new QueueFlag();

var waitTask = flag.WaitForRaised();
waitTask.IsCompleted.ShouldBeFalse();

flag.Raise();

await waitTask.ShouldCompleteIn(100);
}

[TestMethod]
public async Task MultipleRaisesOnlySignalsOnce()
{
var flag = new QueueFlag();

flag.Raise();
flag.Raise();
flag.Raise();

var firstWait = flag.WaitForRaised();
firstWait.IsCompleted.ShouldBeTrue();

var secondWait = flag.WaitForRaised();
secondWait.IsCompleted.ShouldBeFalse();

flag.Raise();
await secondWait.ShouldCompleteIn(100);
}

[TestMethod]
public async Task AlternatingRaiseAndWaitWorks()
{
var flag = new QueueFlag();

for (var i = 0; i < 5; i++)
{
flag.Raise();
var task = flag.WaitForRaised();
task.IsCompleted.ShouldBeTrue();
await task;
}
}

[TestMethod]
public async Task AlternatingWaitAndRaiseWorks()
{
var flag = new QueueFlag();

for (var i = 0; i < 5; i++)
{
var waitTask = flag.WaitForRaised();
waitTask.IsCompleted.ShouldBeFalse();

flag.Raise();
await waitTask.ShouldCompleteIn(100);
}
}

[TestMethod]
public async Task ConcurrentProducerConsumerScenario()
{
var flag = new QueueFlag();
var producerCount = 0;
var consumerCount = 0;
var iterations = 100;

var producer = Task.Run(async () =>
{
for (var i = 0; i < iterations; i++)
{
producerCount++;
flag.Raise();
await Task.Delay(1);
}
});

var consumer = Task.Run(async () =>
{
for (var i = 0; i < iterations; i++)
{
await flag.WaitForRaised();
consumerCount++;
}
});

await Task.WhenAll(producer, consumer).ShouldCompleteIn(5000);

producerCount.ShouldBe(iterations);
consumerCount.ShouldBe(iterations);
}

[TestMethod]
public async Task RaiseWithNoWaiterThenWaitCompletesImmediately()
{
var flag = new QueueFlag();

flag.Raise();
await Task.Delay(10);

var task = flag.WaitForRaised();
task.IsCompleted.ShouldBeTrue();
await task;
}
}

internal static class TaskExtensions
{
public static async Task ShouldCompleteIn(this Task task, int milliseconds)
{
var completedTask = await Task.WhenAny(task, Task.Delay(milliseconds));
if (completedTask != task)
throw new AssertFailedException($"Task did not complete within {milliseconds}ms");

await task;
}
}
8 changes: 0 additions & 8 deletions Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs

This file was deleted.

11 changes: 0 additions & 11 deletions Core/Cleipnir.ResilientFunctions/Queuing/IQueueStore.cs

This file was deleted.

31 changes: 31 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Cleipnir.ResilientFunctions.Queuing;

public class QueueBuilder<T>(IEnumerable<Func<object, bool>> predicates, QueueManager manager)
{
internal bool CanHandle(object msg)
{
if (msg is not T)
return false;

return predicates.All(f => f(msg));
}

public QueueBuilder<T> Where(Func<T, bool> predicate)
=> new(
predicates.Append(msg => msg is T && predicate((T)msg)),
manager
);

public QueueBuilder<TChild> OfType<TChild>() where TChild : T
=> new QueueBuilder<TChild>(predicates, manager);

public async Task<T> Next()

Check warning on line 27 in Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 27 in Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 27 in Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 27 in Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
throw new NotImplementedException();
}
}
38 changes: 38 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Threading;
using System.Threading.Tasks;

namespace Cleipnir.ResilientFunctions.Queuing;

public class QueueFlag
{
private readonly Lock _sync = new();
private TaskCompletionSource _waiter = new();

public async Task WaitForRaised()
{
TaskCompletionSource waiter;
lock (_sync)
{
waiter = _waiter;
if (waiter.Task.IsCompleted)
{
_waiter = new TaskCompletionSource();
return;
}
}

await waiter.Task;

lock (_sync)
_waiter = new();
}

public void Raise()
{
TaskCompletionSource waiter;
lock (_sync)
waiter = _waiter;

waiter.TrySetResult();
}
}
34 changes: 25 additions & 9 deletions Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
using Cleipnir.ResilientFunctions.Messaging;
Expand All @@ -9,11 +9,12 @@
namespace Cleipnir.ResilientFunctions.Queuing;

public delegate bool MessagePredicate(object message);
public delegate Task MessageHandler(object message, long position, string? idempotencyKey);

public class QueueManager(StoredId storedId, IMessageStore messageStore, ISerializer serializer)
{
private Dictionary<int, Subscriber> _subscribers = new();
private readonly Dictionary<int, Subscription> _subscribers = new();
private readonly Lock _lock = new();
private int _nextId;

public async Task Run()
{
Expand All @@ -28,21 +29,36 @@ public async Task Run()
toDeliver[max++] = new MessageWithPosition(msg, position, idempotencyKey);
}

foreach (var (key, (message, position, idempotencyKey)) in toDeliver.ToList())
foreach (var (key, messageWithPosition) in toDeliver.ToList())
{
foreach (var subscriber in _subscribers.Values)
var (message, position, idempotencyKey) = messageWithPosition;
foreach (var kv in _subscribers.ToList())
{
var subscriberId = kv.Key;
var subscriber = kv.Value;
if (subscriber.Predicate(message))
{
await subscriber.Handler(messages, position, idempotencyKey);
_subscribers.Remove(subscriberId);
subscriber.Tcs.SetResult(messageWithPosition);
toDeliver.Remove(key);
}
}
}
}
}

public Task<MessageWithPosition> Subscribe(MessagePredicate predicate)
{
var tcs = new TaskCompletionSource<MessageWithPosition>();
lock (_lock)
{
var id = _nextId++;
_subscribers[id] = new Subscription(predicate, tcs);
}

return tcs.Task;
}

private record MessageWithPosition(object Message, long Position, string? IdempotencyKey);

private record Subscriber(int Id, MessagePredicate Predicate, MessageHandler Handler);
public record MessageWithPosition(object Message, long Position, string? IdempotencyKey);
private record Subscription(MessagePredicate Predicate, TaskCompletionSource<MessageWithPosition> Tcs);
}
9 changes: 9 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;
using System.Collections.Generic;

namespace Cleipnir.ResilientFunctions.Queuing;

public class QueueSubscription<T>(IEnumerable<Func<object, bool>> predicates, QueueManager queueManager)

Check warning on line 6 in Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Parameter 'queueManager' is unread.

Check warning on line 6 in Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Parameter 'predicates' is unread.

Check warning on line 6 in Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Parameter 'queueManager' is unread.

Check warning on line 6 in Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Parameter 'predicates' is unread.

Check warning on line 6 in Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Parameter 'queueManager' is unread.

Check warning on line 6 in Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Parameter 'predicates' is unread.

Check warning on line 6 in Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Parameter 'queueManager' is unread.

Check warning on line 6 in Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Parameter 'predicates' is unread.
{

}