From 06cc7b172c718402a8985c334de5f58bb29a477c Mon Sep 17 00:00:00 2001 From: stidsborg Date: Thu, 13 Nov 2025 20:05:12 +0100 Subject: [PATCH 1/4] wip --- .../Queuing/IQueue.cs | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs b/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs index 51393c44..606365dd 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs @@ -1,8 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; +using System.Transactions; namespace Cleipnir.ResilientFunctions.Queuing; public interface IQueue { Task Next() where T : notnull; +} + +public interface IQueueBuilder +{ + public IQueueBuilder Where(Func predicate); + public IQueueBuilder OfType() where T2 : T; +} + +public class QueueBuilder(IEnumerable> predicates) +{ + public QueueBuilder Where(Func predicate) + => new( + predicates.Append(msg => msg is T && predicate((T)msg)) + ); + + public QueueBuilder OfType() where TChild : T + => new(predicates.Append(msg => msg is TChild)); } \ No newline at end of file From 4373727df9e9b8f5105e4edd9497db7836bf8983 Mon Sep 17 00:00:00 2001 From: Thomas Stidsborg Sylvest Date: Fri, 14 Nov 2025 12:37:39 +0100 Subject: [PATCH 2/4] wip --- .../Queuing/IQueue.cs | 32 +++++++++++++------ .../Queuing/QueueManager.cs | 2 ++ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs b/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs index 606365dd..ce389295 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using System.Transactions; namespace Cleipnir.ResilientFunctions.Queuing; @@ -11,19 +10,32 @@ public interface IQueue Task Next() where T : notnull; } -public interface IQueueBuilder +public class QueueBuilder(IEnumerable> predicates, QueueManager manager) { - public IQueueBuilder Where(Func predicate); - public IQueueBuilder OfType() where T2 : T; -} + internal bool CanHandle(object msg) + { + if (msg is not T) + return false; -public class QueueBuilder(IEnumerable> predicates) -{ + return predicates.All(f => f(msg)); + } + public QueueBuilder Where(Func predicate) => new( - predicates.Append(msg => msg is T && predicate((T)msg)) + predicates.Append(msg => msg is T && predicate((T)msg)), + manager ); + + public QueueBuilder OfType() where TChild : T + => new QueueBuilder(predicates, manager); + + public async Task Next() + { + + } +} + +public class QueueSubscription(IEnumerable> predicates, QueueManager queueManager) +{ - public QueueBuilder OfType() where TChild : T - => new(predicates.Append(msg => msg is TChild)); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index 11c374d5..2bec0826 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -42,6 +42,8 @@ public async Task Run() } } + public int Subscribe(); + private record MessageWithPosition(object Message, long Position, string? IdempotencyKey); private record Subscriber(int Id, MessagePredicate Predicate, MessageHandler Handler); From 7564eedceb36abe4d739de5607f15d806cd52a64 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sat, 15 Nov 2025 09:38:47 +0100 Subject: [PATCH 3/4] wip --- .../UtilsTests/QueueFlagTests.cs | 141 ++++++++++++++++++ .../Queuing/IQueueStore.cs | 11 -- .../Queuing/{IQueue.cs => QueueBuilder.cs} | 14 +- .../Queuing/QueueFlag.cs | 45 ++++++ .../Queuing/QueueManager.cs | 36 +++-- .../Queuing/QueueSubscription.cs | 9 ++ 6 files changed, 222 insertions(+), 34 deletions(-) create mode 100644 Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/QueueFlagTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Queuing/IQueueStore.cs rename Core/Cleipnir.ResilientFunctions/Queuing/{IQueue.cs => QueueBuilder.cs} (75%) create mode 100644 Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs create mode 100644 Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs diff --git a/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/QueueFlagTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/QueueFlagTests.cs new file mode 100644 index 00000000..aef64bcd --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/QueueFlagTests.cs @@ -0,0 +1,141 @@ +using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Queuing; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Shouldly; + +namespace Cleipnir.ResilientFunctions.Tests.UtilsTests; + +[TestClass] +public class QueueFlagTests +{ + [TestMethod] + public async Task RaiseBeforeWaitCompletesImmediately() + { + var flag = new QueueFlag(); + + flag.Raise(); + var task = flag.WaitForRaised(); + + task.IsCompleted.ShouldBeTrue(); + await task; + } + + [TestMethod] + public async Task WaitBeforeRaiseCompletesAfterRaise() + { + var flag = new QueueFlag(); + + var waitTask = flag.WaitForRaised(); + waitTask.IsCompleted.ShouldBeFalse(); + + flag.Raise(); + + await waitTask.ShouldCompleteIn(100); + } + + [TestMethod] + public async Task MultipleRaisesOnlySignalsOnce() + { + var flag = new QueueFlag(); + + flag.Raise(); + flag.Raise(); + flag.Raise(); + + var firstWait = flag.WaitForRaised(); + firstWait.IsCompleted.ShouldBeTrue(); + + var secondWait = flag.WaitForRaised(); + secondWait.IsCompleted.ShouldBeFalse(); + + flag.Raise(); + await secondWait.ShouldCompleteIn(100); + } + + [TestMethod] + public async Task AlternatingRaiseAndWaitWorks() + { + var flag = new QueueFlag(); + + for (var i = 0; i < 5; i++) + { + flag.Raise(); + var task = flag.WaitForRaised(); + task.IsCompleted.ShouldBeTrue(); + await task; + } + } + + [TestMethod] + public async Task AlternatingWaitAndRaiseWorks() + { + var flag = new QueueFlag(); + + for (var i = 0; i < 5; i++) + { + var waitTask = flag.WaitForRaised(); + waitTask.IsCompleted.ShouldBeFalse(); + + flag.Raise(); + await waitTask.ShouldCompleteIn(100); + } + } + + [TestMethod] + public async Task ConcurrentProducerConsumerScenario() + { + var flag = new QueueFlag(); + var producerCount = 0; + var consumerCount = 0; + var iterations = 100; + + var producer = Task.Run(async () => + { + for (var i = 0; i < iterations; i++) + { + producerCount++; + flag.Raise(); + await Task.Delay(1); + } + }); + + var consumer = Task.Run(async () => + { + for (var i = 0; i < iterations; i++) + { + await flag.WaitForRaised(); + consumerCount++; + } + }); + + await Task.WhenAll(producer, consumer).ShouldCompleteIn(5000); + + producerCount.ShouldBe(iterations); + consumerCount.ShouldBe(iterations); + } + + [TestMethod] + public async Task RaiseWithNoWaiterThenWaitCompletesImmediately() + { + var flag = new QueueFlag(); + + flag.Raise(); + await Task.Delay(10); + + var task = flag.WaitForRaised(); + task.IsCompleted.ShouldBeTrue(); + await task; + } +} + +internal static class TaskExtensions +{ + public static async Task ShouldCompleteIn(this Task task, int milliseconds) + { + var completedTask = await Task.WhenAny(task, Task.Delay(milliseconds)); + if (completedTask != task) + throw new AssertFailedException($"Task did not complete within {milliseconds}ms"); + + await task; + } +} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/IQueueStore.cs b/Core/Cleipnir.ResilientFunctions/Queuing/IQueueStore.cs deleted file mode 100644 index cc14d6a2..00000000 --- a/Core/Cleipnir.ResilientFunctions/Queuing/IQueueStore.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; - -namespace Cleipnir.ResilientFunctions.Queuing; - -public interface IQueueStore -{ - Task Peek(FlowId flowId); - Task Pop(FlowId flowId); - Task Push(FlowId flowId, StoredQueueItem storedQueueItem); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs similarity index 75% rename from Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs rename to Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs index ce389295..74756936 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/IQueue.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs @@ -5,11 +5,6 @@ namespace Cleipnir.ResilientFunctions.Queuing; -public interface IQueue -{ - Task Next() where T : notnull; -} - public class QueueBuilder(IEnumerable> predicates, QueueManager manager) { internal bool CanHandle(object msg) @@ -29,13 +24,8 @@ public QueueBuilder Where(Func predicate) public QueueBuilder OfType() where TChild : T => new QueueBuilder(predicates, manager); - public async Task Next() + public async Task Next() { - + throw new NotImplementedException(); } -} - -public class QueueSubscription(IEnumerable> predicates, QueueManager queueManager) -{ - } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs new file mode 100644 index 00000000..929ea74e --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs @@ -0,0 +1,45 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Cleipnir.ResilientFunctions.Queuing; + +public class QueueFlag +{ + private readonly Lock _sync = new(); + private TaskCompletionSource? _waiter = null; + private bool _signaled; + + public Task WaitForRaised() + { + TaskCompletionSource waiter; + lock (_sync) + if (_signaled) + { + _signaled = false; + return Task.CompletedTask; + } + else + { + waiter = _waiter = new(); + } + + return waiter.Task; + } + + public void Raise() + { + TaskCompletionSource? waiter = null; + lock (_sync) + if (_signaled) + return; + else if (_waiter == null) + _signaled = true; + else + { + waiter = _waiter; + _waiter = null; + } + + waiter?.SetResult(); + } +} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index 2bec0826..846d84ca 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -1,6 +1,6 @@ -using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Messaging; @@ -9,11 +9,12 @@ namespace Cleipnir.ResilientFunctions.Queuing; public delegate bool MessagePredicate(object message); -public delegate Task MessageHandler(object message, long position, string? idempotencyKey); public class QueueManager(StoredId storedId, IMessageStore messageStore, ISerializer serializer) { - private Dictionary _subscribers = new(); + private readonly Dictionary _subscribers = new(); + private readonly Lock _lock = new(); + private int _nextId; public async Task Run() { @@ -28,23 +29,36 @@ public async Task Run() toDeliver[max++] = new MessageWithPosition(msg, position, idempotencyKey); } - foreach (var (key, (message, position, idempotencyKey)) in toDeliver.ToList()) + foreach (var (key, messageWithPosition) in toDeliver.ToList()) { - foreach (var subscriber in _subscribers.Values) + var (message, position, idempotencyKey) = messageWithPosition; + foreach (var kv in _subscribers.ToList()) { + var subscriberId = kv.Key; + var subscriber = kv.Value; if (subscriber.Predicate(message)) { - await subscriber.Handler(messages, position, idempotencyKey); + _subscribers.Remove(subscriberId); + subscriber.Tcs.SetResult(messageWithPosition); toDeliver.Remove(key); } } } } } + + public Task Subscribe(MessagePredicate predicate) + { + var tcs = new TaskCompletionSource(); + lock (_lock) + { + var id = _nextId++; + _subscribers[id] = new Subscription(predicate, tcs); + } + + return tcs.Task; + } - public int Subscribe(); - - private record MessageWithPosition(object Message, long Position, string? IdempotencyKey); - - private record Subscriber(int Id, MessagePredicate Predicate, MessageHandler Handler); + public record MessageWithPosition(object Message, long Position, string? IdempotencyKey); + private record Subscription(MessagePredicate Predicate, TaskCompletionSource Tcs); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs new file mode 100644 index 00000000..0967d411 --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueSubscription.cs @@ -0,0 +1,9 @@ +using System; +using System.Collections.Generic; + +namespace Cleipnir.ResilientFunctions.Queuing; + +public class QueueSubscription(IEnumerable> predicates, QueueManager queueManager) +{ + +} \ No newline at end of file From 5e4275031dc0c562ea2683729b933e61616fcc94 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sat, 15 Nov 2025 14:32:09 +0100 Subject: [PATCH 4/4] Simplified QueueFlag --- .../Queuing/QueueFlag.cs | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs index 929ea74e..43fb20b8 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueFlag.cs @@ -6,40 +6,33 @@ namespace Cleipnir.ResilientFunctions.Queuing; public class QueueFlag { private readonly Lock _sync = new(); - private TaskCompletionSource? _waiter = null; - private bool _signaled; + private TaskCompletionSource _waiter = new(); - public Task WaitForRaised() + public async Task WaitForRaised() { TaskCompletionSource waiter; lock (_sync) - if (_signaled) + { + waiter = _waiter; + if (waiter.Task.IsCompleted) { - _signaled = false; - return Task.CompletedTask; + _waiter = new TaskCompletionSource(); + return; } - else - { - waiter = _waiter = new(); - } + } - return waiter.Task; + await waiter.Task; + + lock (_sync) + _waiter = new(); } public void Raise() { - TaskCompletionSource? waiter = null; + TaskCompletionSource waiter; lock (_sync) - if (_signaled) - return; - else if (_waiter == null) - _signaled = true; - else - { - waiter = _waiter; - _waiter = null; - } + waiter = _waiter; - waiter?.SetResult(); + waiter.TrySetResult(); } } \ No newline at end of file