diff --git a/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/QueueFlagTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/QueueFlagTests.cs new file mode 100644 index 000000000..aef64bcd5 --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/QueueFlagTests.cs @@ -0,0 +1,141 @@ +using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Queuing; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Shouldly; + +namespace Cleipnir.ResilientFunctions.Tests.UtilsTests; + +[TestClass] +public class QueueFlagTests +{ + [TestMethod] + public async Task RaiseBeforeWaitCompletesImmediately() + { + var flag = new QueueFlag(); + + flag.Raise(); + var task = flag.WaitForRaised(); + + task.IsCompleted.ShouldBeTrue(); + await task; + } + + [TestMethod] + public async Task WaitBeforeRaiseCompletesAfterRaise() + { + var flag = new QueueFlag(); + + var waitTask = flag.WaitForRaised(); + waitTask.IsCompleted.ShouldBeFalse(); + + flag.Raise(); + + await waitTask.ShouldCompleteIn(100); + } + + [TestMethod] + public async Task MultipleRaisesOnlySignalsOnce() + { + var flag = new QueueFlag(); + + flag.Raise(); + flag.Raise(); + flag.Raise(); + + var firstWait = flag.WaitForRaised(); + firstWait.IsCompleted.ShouldBeTrue(); + + var secondWait = flag.WaitForRaised(); + secondWait.IsCompleted.ShouldBeFalse(); + + flag.Raise(); + await secondWait.ShouldCompleteIn(100); + } + + [TestMethod] + public async Task AlternatingRaiseAndWaitWorks() + { + var flag = new QueueFlag(); + + for (var i = 0; i < 5; i++) + { + flag.Raise(); + var task = flag.WaitForRaised(); + task.IsCompleted.ShouldBeTrue(); + await task; + } + } + + [TestMethod] + public async Task AlternatingWaitAndRaiseWorks() + { + var flag = new QueueFlag(); + + for (var i = 0; i < 5; i++) + { + var waitTask = flag.WaitForRaised(); + waitTask.IsCompleted.ShouldBeFalse(); + + flag.Raise(); + await waitTask.ShouldCompleteIn(100); + } + } + + [TestMethod] + public async Task ConcurrentProducerConsumerScenario() + { + var flag = new QueueFlag(); + var producerCount = 0; + var consumerCount = 0; + var iterations = 100; + + var producer = Task.Run(async () => + { + for (var i = 0; i < iterations; i++) + { + producerCount++; + flag.Raise(); + await Task.Delay(1); + } + }); + + var consumer = Task.Run(async () => + { + for (var i = 0; i < iterations; i++) + { + await flag.WaitForRaised(); + consumerCount++; + } + }); + + await Task.WhenAll(producer, consumer).ShouldCompleteIn(5000); + + producerCount.ShouldBe(iterations); + consumerCount.ShouldBe(iterations); + } + + [TestMethod] + public async Task RaiseWithNoWaiterThenWaitCompletesImmediately() + { + var flag = new QueueFlag(); + + flag.Raise(); + await Task.Delay(10); + + var task = flag.WaitForRaised(); + task.IsCompleted.ShouldBeTrue(); + await task; + } +} + +internal static class TaskExtensions +{ + public static async Task ShouldCompleteIn(this Task task, int milliseconds) + { + var completedTask = await Task.WhenAny(task, Task.Delay(milliseconds)); + if (completedTask != task) + throw new AssertFailedException($"Task did not complete within {milliseconds}ms"); + + await task; + } +} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs b/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs deleted file mode 100644 index 51393c443..000000000 --- a/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Queuing; - -public interface IQueue -{ - Task Next() where T : notnull; -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/IQueueStore.cs b/Core/Cleipnir.ResilientFunctions/Queuing/IQueueStore.cs deleted file mode 100644 index cc14d6a2d..000000000 --- a/Core/Cleipnir.ResilientFunctions/Queuing/IQueueStore.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; - -namespace Cleipnir.ResilientFunctions.Queuing; - -public interface IQueueStore -{ - Task Peek(FlowId flowId); - Task Pop(FlowId flowId); - Task Push(FlowId flowId, StoredQueueItem storedQueueItem); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs new file mode 100644 index 000000000..747569362 --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Cleipnir.ResilientFunctions.Queuing; + +public class QueueBuilder(IEnumerable> predicates, QueueManager manager) +{ + internal bool CanHandle(object msg) + { + if (msg is not T) + return false; + + return predicates.All(f => f(msg)); + } + + public QueueBuilder Where(Func predicate) + => new( + predicates.Append(msg => msg is T && predicate((T)msg)), + manager + ); + + public QueueBuilder OfType() where TChild : T + => new QueueBuilder(predicates, manager); + + public async Task Next() + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs new file mode 100644 index 000000000..43fb20b88 --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs @@ -0,0 +1,38 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Cleipnir.ResilientFunctions.Queuing; + +public class QueueFlag +{ + private readonly Lock _sync = new(); + private TaskCompletionSource _waiter = new(); + + public async Task WaitForRaised() + { + TaskCompletionSource waiter; + lock (_sync) + { + waiter = _waiter; + if (waiter.Task.IsCompleted) + { + _waiter = new TaskCompletionSource(); + return; + } + } + + await waiter.Task; + + lock (_sync) + _waiter = new(); + } + + public void Raise() + { + TaskCompletionSource waiter; + lock (_sync) + waiter = _waiter; + + waiter.TrySetResult(); + } +} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index 11c374d55..846d84ca7 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -1,6 +1,6 @@ -using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Messaging; @@ -9,11 +9,12 @@ namespace Cleipnir.ResilientFunctions.Queuing; public delegate bool MessagePredicate(object message); -public delegate Task MessageHandler(object message, long position, string? idempotencyKey); public class QueueManager(StoredId storedId, IMessageStore messageStore, ISerializer serializer) { - private Dictionary _subscribers = new(); + private readonly Dictionary _subscribers = new(); + private readonly Lock _lock = new(); + private int _nextId; public async Task Run() { @@ -28,21 +29,36 @@ public async Task Run() toDeliver[max++] = new MessageWithPosition(msg, position, idempotencyKey); } - foreach (var (key, (message, position, idempotencyKey)) in toDeliver.ToList()) + foreach (var (key, messageWithPosition) in toDeliver.ToList()) { - foreach (var subscriber in _subscribers.Values) + var (message, position, idempotencyKey) = messageWithPosition; + foreach (var kv in _subscribers.ToList()) { + var subscriberId = kv.Key; + var subscriber = kv.Value; if (subscriber.Predicate(message)) { - await subscriber.Handler(messages, position, idempotencyKey); + _subscribers.Remove(subscriberId); + subscriber.Tcs.SetResult(messageWithPosition); toDeliver.Remove(key); } } } } } + + public Task Subscribe(MessagePredicate predicate) + { + var tcs = new TaskCompletionSource(); + lock (_lock) + { + var id = _nextId++; + _subscribers[id] = new Subscription(predicate, tcs); + } + + return tcs.Task; + } - private record MessageWithPosition(object Message, long Position, string? IdempotencyKey); - - private record Subscriber(int Id, MessagePredicate Predicate, MessageHandler Handler); + public record MessageWithPosition(object Message, long Position, string? IdempotencyKey); + private record Subscription(MessagePredicate Predicate, TaskCompletionSource Tcs); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs new file mode 100644 index 000000000..0967d4114 --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs @@ -0,0 +1,9 @@ +using System; +using System.Collections.Generic; + +namespace Cleipnir.ResilientFunctions.Queuing; + +public class QueueSubscription(IEnumerable> predicates, QueueManager queueManager) +{ + +} \ No newline at end of file