From d2724483f33c56cb388b64398466f45df06eb8e3 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 16 Nov 2025 10:50:52 +0100 Subject: [PATCH 1/7] Effect refactoring --- .../Domain/Effect.cs | 86 +++++++++++++++++-- 1 file changed, 78 insertions(+), 8 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs index 8301d2456..50cb515ad 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs @@ -66,22 +66,92 @@ internal async Task Get(EffectId effectId) #region Implicit ids public Task Capture(Action work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => Capture(id: EffectContext.CurrentContext.NextImplicitId(), work, resiliency); + => InnerCapture( + id: EffectContext.CurrentContext.NextImplicitId(), + EffectType.Effect, + work: () => + { + work(); + return Task.CompletedTask; + }, + resiliency, + EffectContext.CurrentContext, + retryPolicy: null + ); + public Task Capture(Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => Capture(id: EffectContext.CurrentContext.NextImplicitId(), work: () => work().ToTask(), resiliency); + => InnerCapture( + id: EffectContext.CurrentContext.NextImplicitId(), + EffectType.Effect, + work: () => work().ToTask(), + resiliency, + EffectContext.CurrentContext, + retryPolicy: null + ); + public Task Capture(Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => Capture(id: EffectContext.CurrentContext.NextImplicitId(), work, resiliency); + => InnerCapture( + id: EffectContext.CurrentContext.NextImplicitId(), + EffectType.Effect, + work, + resiliency, + EffectContext.CurrentContext, + retryPolicy: null + ); + public Task Capture(Func> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => Capture(id: EffectContext.CurrentContext.NextImplicitId(), work, resiliency); + => InnerCapture( + id: EffectContext.CurrentContext.NextImplicitId(), + EffectType.Effect, + work, + resiliency, + EffectContext.CurrentContext, + retryPolicy: null + ); public Task Capture(Action work, RetryPolicy retryPolicy, bool flush = true) - => Capture(id: EffectContext.CurrentContext.NextImplicitId(), work, retryPolicy, flush); + => InnerCapture( + id: EffectContext.CurrentContext.NextImplicitId(), + EffectType.Effect, + work: () => + { + work(); + return Task.CompletedTask; + }, + flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, + EffectContext.CurrentContext, + retryPolicy + ); + public Task Capture(Func work, RetryPolicy retryPolicy, bool flush = true) - => Capture(id: EffectContext.CurrentContext.NextImplicitId(), work: () => work().ToTask(), retryPolicy, flush); + => InnerCapture( + id: EffectContext.CurrentContext.NextImplicitId(), + EffectType.Effect, + work: () => work().ToTask(), + flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, + EffectContext.CurrentContext, + retryPolicy + ); + public Task Capture(Func work, RetryPolicy retryPolicy, bool flush = true) - => Capture(id: EffectContext.CurrentContext.NextImplicitId(), work, retryPolicy, flush); + => InnerCapture( + id: EffectContext.CurrentContext.NextImplicitId(), + EffectType.Effect, + work, + flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, + EffectContext.CurrentContext, + retryPolicy + ); + public Task Capture(Func> work, RetryPolicy retryPolicy, bool flush = true) - => Capture(id: EffectContext.CurrentContext.NextImplicitId(), work, retryPolicy, flush); + => InnerCapture( + id: EffectContext.CurrentContext.NextImplicitId(), + EffectType.Effect, + work, + flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, + EffectContext.CurrentContext, + retryPolicy + ); #endregion From b7e6b0203de28e4a6f2f02cc50d4f7e288060307 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 16 Nov 2025 11:13:01 +0100 Subject: [PATCH 2/7] wip --- .../Domain/Effect.cs | 63 +++++++++++-------- .../Domain/EffectResults.cs | 26 ++++---- .../Storage/Types.cs | 16 ++--- 3 files changed, 59 insertions(+), 46 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs index 50cb515ad..55646f12d 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs @@ -32,11 +32,12 @@ public class Effect(EffectResults effectResults, UtcNow utcNow, FlowMinimumTimeo public async Task Mark(string id) { - var effectId = CreateEffectId(id); + var usedId = EffectContext.CurrentContext.NextImplicitId(); + var effectId = CreateEffectId(usedId); if (await effectResults.Contains(effectId)) return false; - var storedEffect = StoredEffect.CreateCompleted(effectId); + var storedEffect = StoredEffect.CreateCompleted(effectId, alias: id); await effectResults.Set(storedEffect, flush: true); return true; } @@ -68,6 +69,7 @@ internal async Task Get(EffectId effectId) public Task Capture(Action work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) => InnerCapture( id: EffectContext.CurrentContext.NextImplicitId(), + alias: null, EffectType.Effect, work: () => { @@ -82,6 +84,7 @@ public Task Capture(Action work, ResiliencyLevel resiliency = ResiliencyLevel.At public Task Capture(Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) => InnerCapture( id: EffectContext.CurrentContext.NextImplicitId(), + alias: null, EffectType.Effect, work: () => work().ToTask(), resiliency, @@ -92,6 +95,7 @@ public Task Capture(Func work, ResiliencyLevel resiliency = ResiliencyL public Task Capture(Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) => InnerCapture( id: EffectContext.CurrentContext.NextImplicitId(), + alias: null, EffectType.Effect, work, resiliency, @@ -102,6 +106,7 @@ public Task Capture(Func work, ResiliencyLevel resiliency = ResiliencyLeve public Task Capture(Func> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) => InnerCapture( id: EffectContext.CurrentContext.NextImplicitId(), + alias: null, EffectType.Effect, work, resiliency, @@ -112,6 +117,7 @@ public Task Capture(Func> work, ResiliencyLevel resiliency = Resil public Task Capture(Action work, RetryPolicy retryPolicy, bool flush = true) => InnerCapture( id: EffectContext.CurrentContext.NextImplicitId(), + alias: null, EffectType.Effect, work: () => { @@ -126,6 +132,7 @@ public Task Capture(Action work, RetryPolicy retryPolicy, bool flush = true) public Task Capture(Func work, RetryPolicy retryPolicy, bool flush = true) => InnerCapture( id: EffectContext.CurrentContext.NextImplicitId(), + alias: null, EffectType.Effect, work: () => work().ToTask(), flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, @@ -136,6 +143,7 @@ public Task Capture(Func work, RetryPolicy retryPolicy, bool flush = tr public Task Capture(Func work, RetryPolicy retryPolicy, bool flush = true) => InnerCapture( id: EffectContext.CurrentContext.NextImplicitId(), + alias: null, EffectType.Effect, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, @@ -146,6 +154,7 @@ public Task Capture(Func work, RetryPolicy retryPolicy, bool flush = true) public Task Capture(Func> work, RetryPolicy retryPolicy, bool flush = true) => InnerCapture( id: EffectContext.CurrentContext.NextImplicitId(), + alias: null, EffectType.Effect, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, @@ -155,50 +164,54 @@ public Task Capture(Func> work, RetryPolicy retryPolicy, bool flus #endregion - public Task Capture(string id, Action work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => Capture(id, work: () => { work(); return Task.CompletedTask; }, resiliency); - public Task Capture(string id, Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => Capture(id, work: () => work().ToTask(), resiliency); - public async Task Capture(string id, Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => await InnerCapture(id, EffectType.Effect, work, resiliency, EffectContext.CurrentContext, retryPolicy: null); - public async Task Capture(string id, Func> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => await InnerCapture(id, EffectType.Effect, work, resiliency, EffectContext.CurrentContext, retryPolicy: null); + public Task Capture(string alias, Action work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) + => Capture(alias, work: () => { work(); return Task.CompletedTask; }, resiliency); + public Task Capture(string alias, Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) + => Capture(alias, work: () => work().ToTask(), resiliency); + public async Task Capture(string alias, Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) + => await InnerCapture(EffectContext.CurrentContext.NextImplicitId(), alias, EffectType.Effect, work, resiliency, EffectContext.CurrentContext, retryPolicy: null); + public async Task Capture(string alias, Func> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) + => await InnerCapture(EffectContext.CurrentContext.NextImplicitId(), alias, EffectType.Effect, work, resiliency, EffectContext.CurrentContext, retryPolicy: null); - public Task Capture(string id, Action work, RetryPolicy retryPolicy, bool flush = true) - => Capture(id, work: () => { work(); return Task.CompletedTask; }, retryPolicy, flush); - public Task Capture(string id, Func work, RetryPolicy retryPolicy, bool flush = true) - => Capture(id, work: () => work().ToTask(), retryPolicy, flush); - public async Task Capture(string id, Func work, RetryPolicy retryPolicy, bool flush = true) - => await InnerCapture(id, EffectType.Effect, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, EffectContext.CurrentContext, retryPolicy); - public async Task Capture(string id, Func> work, RetryPolicy retryPolicy, bool flush = true) - => await InnerCapture(id, EffectType.Effect, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, EffectContext.CurrentContext, retryPolicy); - - private async Task InnerCapture(string id, EffectType effectType, Func work, ResiliencyLevel resiliency, EffectContext effectContext, RetryPolicy? retryPolicy) + public Task Capture(string alias, Action work, RetryPolicy retryPolicy, bool flush = true) + => Capture(alias, work: () => { work(); return Task.CompletedTask; }, retryPolicy, flush); + public Task Capture(string alias, Func work, RetryPolicy retryPolicy, bool flush = true) + => Capture(alias, work: () => work().ToTask(), retryPolicy, flush); + public async Task Capture(string alias, Func work, RetryPolicy retryPolicy, bool flush = true) + => await InnerCapture(EffectContext.CurrentContext.NextImplicitId(), alias, EffectType.Effect, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, EffectContext.CurrentContext, retryPolicy); + public async Task Capture(string alias, Func> work, RetryPolicy retryPolicy, bool flush = true) + => await InnerCapture(EffectContext.CurrentContext.NextImplicitId(), alias, EffectType.Effect, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, EffectContext.CurrentContext, retryPolicy); + + private async Task InnerCapture(string id, string? alias, EffectType effectType, Func work, ResiliencyLevel resiliency, EffectContext effectContext, RetryPolicy? retryPolicy) { if (retryPolicy != null && resiliency == ResiliencyLevel.AtMostOnce) throw new InvalidOperationException("Retry policy cannot be used with AtMostOnce resiliency"); if (retryPolicy == null) - await effectResults.InnerCapture(id, effectType, work, resiliency, effectContext); + await effectResults.InnerCapture(id, alias, effectType, work, resiliency, effectContext); else await effectResults.InnerCapture( - id, effectType, + id, + alias, + effectType, work: () => retryPolicy.Invoke(work, effect: this, utcNow, flowMinimumTimeout), resiliency, effectContext ); } - private async Task InnerCapture(string id, EffectType effectType, Func> work, ResiliencyLevel resiliency, EffectContext effectContext, RetryPolicy? retryPolicy) + private async Task InnerCapture(string id, string? alias, EffectType effectType, Func> work, ResiliencyLevel resiliency, EffectContext effectContext, RetryPolicy? retryPolicy) { if (retryPolicy != null && resiliency == ResiliencyLevel.AtMostOnce) throw new InvalidOperationException("Retry policy cannot be used with AtMostOnce resiliency"); if (retryPolicy == null) - return await effectResults.InnerCapture(id, effectType, work, resiliency, effectContext); + return await effectResults.InnerCapture(id, alias, effectType, work, resiliency, effectContext); return await effectResults.InnerCapture( - id, effectType, + id, + alias, + effectType, work: () => retryPolicy.Invoke(work, effect: this, utcNow, flowMinimumTimeout), resiliency, effectContext diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs index def6ee014..cebbc3ed9 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs @@ -87,7 +87,7 @@ public async Task CreateOrGet(EffectId effectId, T value, bool flush) throw serializer.DeserializeException(flowId, existing.StoredEffect.StoredException!); } - var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.Serialize(value)); + var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.Serialize(value), alias: null); await FlushOrAddToPending( storedEffect.EffectId, storedEffect, @@ -102,7 +102,7 @@ internal async Task Upsert(EffectId effectId, T value, bool flush) { await InitializeIfRequired(); - var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.Serialize(value)); + var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.Serialize(value), alias: null); await FlushOrAddToPending( storedEffect.EffectId, storedEffect, @@ -117,7 +117,7 @@ internal async Task Upserts(IEnumerable> values, bool fl var storedEffects = values .Select(t => new { Id = t.Item1, Bytes = serializer.Serialize(t.Item2, t.Item2.GetType()) }) - .Select(a => StoredEffect.CreateCompleted(a.Id, a.Bytes)) + .Select(a => StoredEffect.CreateCompleted(a.Id, a.Bytes, alias: null)) .ToList(); AddToPending(storedEffects); @@ -149,7 +149,7 @@ public async Task> TryGet(EffectId effectId) return Option.NoValue; } - public async Task InnerCapture(string id, EffectType effectType, Func work, ResiliencyLevel resiliency, EffectContext effectContext) + public async Task InnerCapture(string id, string? alias, EffectType effectType, Func work, ResiliencyLevel resiliency, EffectContext effectContext) { await InitializeIfRequired(); @@ -170,7 +170,7 @@ public async Task InnerCapture(string id, EffectType effectType, Func work if (resiliency == ResiliencyLevel.AtMostOnce) { - var storedEffect = StoredEffect.CreateStarted(effectId); + var storedEffect = StoredEffect.CreateStarted(effectId, alias); await FlushOrAddToPending(effectId, storedEffect, flush: true, delete: false); } @@ -185,7 +185,7 @@ public async Task InnerCapture(string id, EffectType effectType, Func work catch (FatalWorkflowException exception) { var storedException = serializer.SerializeException(exception); - var storedEffect = StoredEffect.CreateFailed(effectId, storedException); + var storedEffect = StoredEffect.CreateFailed(effectId, storedException, alias); await FlushOrAddToPending( storedEffect.EffectId, storedEffect, @@ -200,7 +200,7 @@ await FlushOrAddToPending( { var fatalWorkflowException = FatalWorkflowException.CreateNonGeneric(flowId, exception); var storedException = serializer.SerializeException(fatalWorkflowException); - var storedEffect = StoredEffect.CreateFailed(effectId, storedException); + var storedEffect = StoredEffect.CreateFailed(effectId, storedException, alias); await FlushOrAddToPending( storedEffect.EffectId, storedEffect, @@ -212,7 +212,7 @@ await FlushOrAddToPending( } { - var storedEffect = StoredEffect.CreateCompleted(effectId); + var storedEffect = StoredEffect.CreateCompleted(effectId, alias); await FlushOrAddToPending( storedEffect.EffectId, storedEffect, @@ -222,7 +222,7 @@ await FlushOrAddToPending( } } - public async Task InnerCapture(string id, EffectType effectType, Func> work, ResiliencyLevel resiliency, EffectContext effectContext) + public async Task InnerCapture(string id, string? alias, EffectType effectType, Func> work, ResiliencyLevel resiliency, EffectContext effectContext) { await InitializeIfRequired(); @@ -242,7 +242,7 @@ public async Task InnerCapture(string id, EffectType effectType, Func EffectId.ToStoredEffectId(); - public static StoredEffect CreateCompleted(EffectId effectId, byte[] result) - => new(effectId, WorkStatus.Completed, result, StoredException: null); - public static StoredEffect CreateCompleted(EffectId effectId) - => new(effectId, WorkStatus.Completed, Result: null, StoredException: null); - public static StoredEffect CreateStarted(EffectId effectId) - => new(effectId, WorkStatus.Started, Result: null, StoredException: null); - public static StoredEffect CreateFailed(EffectId effectId, StoredException storedException) - => new(effectId, WorkStatus.Failed, Result: null, storedException); + public static StoredEffect CreateCompleted(EffectId effectId, byte[] result, string? alias) + => new(effectId, WorkStatus.Completed, result, StoredException: null, alias); + public static StoredEffect CreateCompleted(EffectId effectId, string? alias) + => new(effectId, WorkStatus.Completed, Result: null, StoredException: null, alias); + public static StoredEffect CreateStarted(EffectId effectId, string? alias) + => new(effectId, WorkStatus.Started, Result: null, StoredException: null, alias); + public static StoredEffect CreateFailed(EffectId effectId, StoredException storedException, string? alias) + => new(effectId, WorkStatus.Failed, Result: null, storedException, alias); public byte[] Serialize() { From 266953634b8d7575eca53a9a93f9698a7f8838ab Mon Sep 17 00:00:00 2001 From: stidsborg Date: Mon, 17 Nov 2025 06:50:43 +0100 Subject: [PATCH 3/7] wip --- .../StoredEffectSerializationTests.cs | 26 ++-- .../AtMostOnceWorkStatusTests.cs | 2 +- .../FunctionTests/EffectTests.cs | 55 +++---- .../Domain/Effect.cs | 31 +++- .../Domain/EffectResults.cs | 16 ++ .../Domain/ExistingEffects.cs | 139 +++++++++++++++--- 6 files changed, 209 insertions(+), 60 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoredEffectSerializationTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoredEffectSerializationTests.cs index 1bb30082c..0a9def137 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoredEffectSerializationTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoredEffectSerializationTests.cs @@ -13,7 +13,7 @@ public void CompletedStoredEffectWithResultCanBeSerializedAndDeserialized() { var effectId = new EffectId("SomeEffect", EffectType.Effect, Context: ""); var result = "SomeResult"u8.ToArray(); - var storedEffect = StoredEffect.CreateCompleted(effectId, result); + var storedEffect = StoredEffect.CreateCompleted(effectId, result, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -28,7 +28,7 @@ public void CompletedStoredEffectWithResultCanBeSerializedAndDeserialized() public void CompletedStoredEffectWithoutResultCanBeSerializedAndDeserialized() { var effectId = new EffectId("SomeEffect", EffectType.Effect, Context: ""); - var storedEffect = StoredEffect.CreateCompleted(effectId); + var storedEffect = StoredEffect.CreateCompleted(effectId, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -43,7 +43,7 @@ public void CompletedStoredEffectWithoutResultCanBeSerializedAndDeserialized() public void StartedStoredEffectCanBeSerializedAndDeserialized() { var effectId = new EffectId("SomeEffect", EffectType.Effect, Context: ""); - var storedEffect = StoredEffect.CreateStarted(effectId); + var storedEffect = StoredEffect.CreateStarted(effectId, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -63,7 +63,7 @@ public void FailedStoredEffectWithExceptionCanBeSerializedAndDeserialized() ExceptionStackTrace: "at SomeMethod() in SomeFile.cs:line 42", ExceptionType: "System.InvalidOperationException" ); - var storedEffect = StoredEffect.CreateFailed(effectId, storedException); + var storedEffect = StoredEffect.CreateFailed(effectId, storedException, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -82,7 +82,7 @@ public void StoredEffectWithStateTypeCanBeSerializedAndDeserialized() { var effectId = new EffectId("SomeState", EffectType.State, Context: ""); var result = "{\"key\":\"value\"}"u8.ToArray(); - var storedEffect = StoredEffect.CreateCompleted(effectId, result); + var storedEffect = StoredEffect.CreateCompleted(effectId, result, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -99,7 +99,7 @@ public void StoredEffectWithContextCanBeSerializedAndDeserialized() var parentEffect = new EffectId("ParentEffect", EffectType.Effect, Context: ""); var effectId = new EffectId("ChildEffect", EffectType.Effect, Context: parentEffect.Serialize().Value); var result = "SomeData"u8.ToArray(); - var storedEffect = StoredEffect.CreateCompleted(effectId, result); + var storedEffect = StoredEffect.CreateCompleted(effectId, result, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -114,7 +114,7 @@ public void StoredEffectWithContextCanBeSerializedAndDeserialized() public void StoredEffectWithTimeoutTypeCanBeSerializedAndDeserialized() { var effectId = new EffectId("SomeTimeout", EffectType.Timeout, Context: ""); - var storedEffect = StoredEffect.CreateStarted(effectId); + var storedEffect = StoredEffect.CreateStarted(effectId, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -128,7 +128,7 @@ public void StoredEffectWithTimeoutTypeCanBeSerializedAndDeserialized() public void StoredEffectWithRetryTypeCanBeSerializedAndDeserialized() { var effectId = new EffectId("SomeRetry", EffectType.Retry, Context: ""); - var storedEffect = StoredEffect.CreateCompleted(effectId); + var storedEffect = StoredEffect.CreateCompleted(effectId, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -142,7 +142,7 @@ public void StoredEffectWithRetryTypeCanBeSerializedAndDeserialized() public void StoredEffectWithSystemTypeCanBeSerializedAndDeserialized() { var effectId = new EffectId("SomeSystem", EffectType.System, Context: ""); - var storedEffect = StoredEffect.CreateCompleted(effectId); + var storedEffect = StoredEffect.CreateCompleted(effectId, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -160,7 +160,7 @@ public void StoredEffectWithLargeResultCanBeSerializedAndDeserialized() for (int i = 0; i < largeResult.Length; i++) largeResult[i] = (byte)(i % 256); - var storedEffect = StoredEffect.CreateCompleted(effectId, largeResult); + var storedEffect = StoredEffect.CreateCompleted(effectId, largeResult, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -175,7 +175,7 @@ public void StoredEffectWithSpecialCharactersInIdCanBeSerializedAndDeserialized( { var effectId = new EffectId("Effect.With\\Special.Characters", EffectType.Effect, Context: ""); var result = "Data"u8.ToArray(); - var storedEffect = StoredEffect.CreateCompleted(effectId, result); + var storedEffect = StoredEffect.CreateCompleted(effectId, result, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -190,7 +190,7 @@ public void StoredEffectWithSpecialCharactersInIdCanBeSerializedAndDeserialized( public void StoredEffectWithEmptyIdCanBeSerializedAndDeserialized() { var effectId = new EffectId("", EffectType.Effect, Context: ""); - var storedEffect = StoredEffect.CreateCompleted(effectId); + var storedEffect = StoredEffect.CreateCompleted(effectId, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); @@ -209,7 +209,7 @@ public void StoredEffectWithNullStackTraceCanBeSerializedAndDeserialized() ExceptionStackTrace: null, ExceptionType: "System.Exception" ); - var storedEffect = StoredEffect.CreateFailed(effectId, storedException); + var storedEffect = StoredEffect.CreateFailed(effectId, storedException, alias: null); var serialized = storedEffect.Serialize(); var deserialized = StoredEffect.Deserialize(serialized); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/AtMostOnceWorkStatusTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/AtMostOnceWorkStatusTests.cs index 2f3f4efab..ed1f63c0c 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/AtMostOnceWorkStatusTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/AtMostOnceWorkStatusTests.cs @@ -95,7 +95,7 @@ async Task(string param, Workflow workflow) => { await workflow.Effect .Capture( - id: "id", + "id", work: () => { counter.Increment(); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs index dd06c57c1..574a612ad 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs @@ -32,7 +32,7 @@ async Task(string param, Workflow workflow) => { var (effect, _) = workflow; await effect.Capture( - id: "Test", + "Test", work: () => syncedCounter.Increment() ); }); @@ -46,14 +46,14 @@ await BusyWait.Until(() => syncedCounter.Current.ShouldBe(1); var effectResults = await store.EffectsStore.GetEffectResults(storedId); - effectResults.Single(r => r.EffectId == "Test".ToEffectId()).WorkStatus.ShouldBe(WorkStatus.Completed); + effectResults.Single(r => r.Alias == "Test").WorkStatus.ShouldBe(WorkStatus.Completed); var controlPanel = await rAction.ControlPanel(flowId.Instance); controlPanel.ShouldNotBeNull(); await controlPanel.Restart(); effectResults = await store.EffectsStore.GetEffectResults(storedId); - effectResults.Single(r => r.EffectId == "Test".ToEffectId()).WorkStatus.ShouldBe(WorkStatus.Completed); + effectResults.Single(r => r.Alias == "Test").WorkStatus.ShouldBe(WorkStatus.Completed); syncedCounter.Current.ShouldBe(1); } @@ -71,7 +71,7 @@ async Task(string param, Workflow workflow) => { var (effect, _) = workflow; await effect.Capture( - id: "Test", + "Test", work: () => { syncedCounter.Increment(); return Task.CompletedTask; }); }); @@ -84,14 +84,14 @@ await BusyWait.Until(() => syncedCounter.Current.ShouldBe(1); var effectResults = await store.EffectsStore.GetEffectResults(storedId); - effectResults.Single(r => r.EffectId == "Test".ToEffectId()).WorkStatus.ShouldBe(WorkStatus.Completed); + effectResults.Single(r => r.Alias == "Test").WorkStatus.ShouldBe(WorkStatus.Completed); var controlPanel = await rAction.ControlPanel(flowId.Instance); controlPanel.ShouldNotBeNull(); await controlPanel.Restart(); effectResults = await store.EffectsStore.GetEffectResults(storedId); - effectResults.Single(r => r.EffectId == "Test".ToEffectId()).WorkStatus.ShouldBe(WorkStatus.Completed); + effectResults.Single(r => r.Alias == "Test").WorkStatus.ShouldBe(WorkStatus.Completed); syncedCounter.Current.ShouldBe(1); } @@ -109,7 +109,7 @@ async Task(string param, Workflow workflow) => { var (effect, _) = workflow; await effect.Capture( - id: "Test", + "Test", work: () => { syncedCounter.Increment(); @@ -126,7 +126,7 @@ await BusyWait.Until(() => syncedCounter.Current.ShouldBe(1); var effectResults = await store.EffectsStore.GetEffectResults(storedId); - var storedEffect = effectResults.Single(r => r.EffectId == "Test".ToEffectId()); + var storedEffect = effectResults.Single(r => r.Alias == "Test"); storedEffect.WorkStatus.ShouldBe(WorkStatus.Completed); storedEffect.Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello"); @@ -135,7 +135,7 @@ await BusyWait.Until(() => await controlPanel.Restart(); effectResults = await store.EffectsStore.GetEffectResults(storedId); - storedEffect = effectResults.Single(r => r.EffectId == "Test".ToEffectId()); + storedEffect = effectResults.Single(r => r.Alias == "Test"); storedEffect.WorkStatus.ShouldBe(WorkStatus.Completed); storedEffect.Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello"); syncedCounter.Current.ShouldBe(1); @@ -155,7 +155,7 @@ async Task(string param, Workflow workflow) => { var (effect, _) = workflow; await effect.Capture( - id: "Test", + "Test", work: () => { syncedCounter.Increment(); @@ -172,7 +172,7 @@ await BusyWait.Until(() => syncedCounter.Current.ShouldBe(1); var effectResults = await store.EffectsStore.GetEffectResults(storedId); - var storedEffect = effectResults.Single(r => r.EffectId == "Test".ToEffectId()); + var storedEffect = effectResults.Single(r => r.Alias == "Test"); storedEffect.WorkStatus.ShouldBe(WorkStatus.Completed); storedEffect.Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello"); @@ -181,7 +181,7 @@ await BusyWait.Until(() => await controlPanel.Restart(); effectResults = await store.EffectsStore.GetEffectResults(storedId); - storedEffect = effectResults.Single(r => r.EffectId == "Test".ToEffectId()); + storedEffect = effectResults.Single(r => r.Alias == "Test"); storedEffect.WorkStatus.ShouldBe(WorkStatus.Completed); storedEffect.Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello"); syncedCounter.Current.ShouldBe(1); @@ -201,7 +201,7 @@ async Task(string param, Workflow workflow) => { var (effect, _) = workflow; await effect.Capture( - id: "Test", + "Test", work: () => { syncedCounter.Increment(); @@ -218,7 +218,7 @@ await BusyWait.Until(() => syncedCounter.Current.ShouldBe(1); var effectResults = await store.EffectsStore.GetEffectResults(storedId); - var storedEffect = effectResults.Single(r => r.EffectId == "Test".ToEffectId()); + var storedEffect = effectResults.Single(r => r.Alias == "Test"); storedEffect.WorkStatus.ShouldBe(WorkStatus.Failed); storedEffect.StoredException.ShouldNotBeNull(); storedEffect.StoredException.ExceptionType.ShouldContain("InvalidOperationException"); @@ -228,7 +228,7 @@ await BusyWait.Until(() => await Should.ThrowAsync(() => controlPanel.Restart()); effectResults = await store.EffectsStore.GetEffectResults(storedId); - storedEffect = effectResults.Single(r => r.EffectId == "Test".ToEffectId()); + storedEffect = effectResults.Single(r => r.Alias == "Test"); storedEffect.WorkStatus.ShouldBe(WorkStatus.Failed); storedEffect.StoredException.ShouldNotBeNull(); storedEffect.StoredException.ExceptionType.ShouldContain("InvalidOperationException"); @@ -257,7 +257,7 @@ async Task (string param, Workflow workflow) => var storedId = rAction.MapToStoredId(flowId.Instance); var effectResults = await store.EffectsStore.GetEffectResults(storedId); - var storedEffect = effectResults.Single(r => r.EffectId == "WhenAny".ToEffectId()); + var storedEffect = effectResults.Single(r => r.Alias == "WhenAny"); storedEffect.WorkStatus.ShouldBe(WorkStatus.Completed); storedEffect.Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe(2); } @@ -284,7 +284,7 @@ async Task (string param, Workflow workflow) => var storedId = rAction.MapToStoredId(flowId.Instance); var effectResults = await store.EffectsStore.GetEffectResults(storedId); - var storedEffect = effectResults.Single(r => r.EffectId == "WhenAll".ToEffectId()); + var storedEffect = effectResults.Single(r => r.Alias == "WhenAll"); storedEffect.WorkStatus.ShouldBe(WorkStatus.Completed); storedEffect.Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe(new [] {1, 2}); } @@ -487,10 +487,11 @@ await effect.Capture("GrandParent", async () => var effectResults = await store.EffectsStore.GetEffectResults(storedId); var subEffectValue1Id = effectResults.Single(se => se.EffectId.Id == "SubEffectValue1").EffectId; - subEffectValue1Id.Context.ShouldBe("EGrandParent.EMother"); - + // With implicit IDs for Capture effects, context uses implicit IDs + subEffectValue1Id.Context.ShouldBe("E0.E0"); + var subEffectValue2Id = effectResults.Single(se => se.EffectId.Id == "SubEffectValue2").EffectId; - subEffectValue2Id.Context.ShouldBe("EGrandParent.EFather"); + subEffectValue2Id.Context.ShouldBe("E0.E1"); } public abstract Task ExceptionThrownInsideEffectBecomesFatalWorkflowException(); @@ -572,7 +573,7 @@ async Task> (string message, Workflow workflow) => { var (effect, _) = workflow; return await effect.Capture( - id: "Test", + "Test", work: () => Option.Create(message) ); }); @@ -695,8 +696,9 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task< var storedEffects = await effectStore.GetEffectResults(storedId); storedEffects.Count.ShouldBe(2); - storedEffects.Single(se => se.EffectId.Id == "1").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello world"); - storedEffects.Single(se => se.EffectId.Id == "2").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello universe"); + // "1" and "2" are now aliases + storedEffects.Single(se => se.Alias == "1").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello world"); + storedEffects.Single(se => se.Alias == "2").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello universe"); } public abstract Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(); @@ -737,7 +739,7 @@ async Task (workflow) => await cp.Refresh(); effectIds = (await cp.Effects.AllIds).ToList(); effectIds.Count().ShouldBe(1); - effectIds.Single().Id.ShouldBe("SomeEffectId"); + // Effect can be retrieved by alias (await cp.Effects.GetValue("SomeEffectId")).ShouldBe(someEffectIdValue); } @@ -777,8 +779,9 @@ public async Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects(Task< var storedEffects = await effectStore.GetEffectResults(storedId); storedEffects.Count.ShouldBe(2); - storedEffects.Single(se => se.EffectId.Id == "1").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello world again"); - storedEffects.Single(se => se.EffectId.Id == "2").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello universe"); + // "1" and "2" are now aliases + storedEffects.Single(se => se.Alias == "1").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello world again"); + storedEffects.Single(se => se.Alias == "2").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello universe"); } public abstract Task CaptureEffectWithRetryPolicy(); diff --git a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs index 55646f12d..01c0c764b 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs @@ -45,7 +45,21 @@ public async Task Mark(string id) public Task CreateOrGet(string id, T value, bool flush = true) => CreateOrGet(CreateEffectId(id), value, flush); internal Task CreateOrGet(EffectId effectId, T value, bool flush) => effectResults.CreateOrGet(effectId, value, flush); - public async Task Upsert(string id, T value, bool flush = true) => await Upsert(CreateEffectId(id, EffectType.Effect), value, flush); + public async Task Upsert(string id, T value, bool flush = true) + { + // Try to resolve the id as either an effect ID or alias + var resolvedEffectId = await effectResults.ResolveEffectIdOrAlias(id); + if (resolvedEffectId != null) + { + await Upsert(resolvedEffectId, value, flush); + } + else + { + // If not found, create a new effect with the given ID + await Upsert(CreateEffectId(id, EffectType.Effect), value, flush); + } + } + internal Task Upsert(EffectId effectId, T value, bool flush) => effectResults.Upsert(effectId, value, flush); internal Task Upserts(IEnumerable> values, bool flush) @@ -218,7 +232,20 @@ private async Task InnerCapture(string id, string? alias, EffectType effec ); } - public Task Clear(string id) => effectResults.Clear(CreateEffectId(id), flush: true); + public async Task Clear(string id) + { + // Try to resolve the id as either an effect ID or alias + var resolvedEffectId = await effectResults.ResolveEffectIdOrAlias(id); + if (resolvedEffectId != null) + { + await effectResults.Clear(resolvedEffectId, flush: true); + } + else + { + // If not found, try to clear using the given ID directly + await effectResults.Clear(CreateEffectId(id), flush: true); + } + } public Task WhenAny(string id, params Task[] tasks) => Capture(id, work: async () => await await Task.WhenAny(tasks)); diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs index cebbc3ed9..04d8d0471 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs @@ -50,6 +50,22 @@ private async Task InitializeIfRequired() } } + internal async Task ResolveEffectIdOrAlias(string idOrAlias) + { + await InitializeIfRequired(); + lock (_sync) + { + // First try to find by ID + var effectId = idOrAlias.ToEffectId(); + if (_effectResults.ContainsKey(effectId)) + return effectId; + + // Then try to find by alias + var effect = _effectResults.FirstOrDefault(e => e.Value.StoredEffect?.Alias == idOrAlias); + return effect.Value?.StoredEffect?.EffectId; + } + } + public async Task Contains(EffectId effectId) { await InitializeIfRequired(); diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs b/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs index 7242c1bbc..aead58f33 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs @@ -23,24 +23,68 @@ private async Task> GetStoredEffects() public Task> AllIds => GetStoredEffects().ContinueWith(t => (IEnumerable) t.Result.Keys); - public async Task HasValue(string effectId) => (await GetStoredEffects()).ContainsKey(effectId.ToEffectId()); + public async Task HasValue(string effectId) + { + var storedEffects = await GetStoredEffects(); + // First try to find by ID + if (storedEffects.ContainsKey(effectId.ToEffectId())) + return true; + // Then try to find by alias + return storedEffects.Values.Any(e => e.Alias == effectId); + } + + public async Task GetValue(string effectId) + { + var storedEffects = await GetStoredEffects(); + // First try to find by ID + var effectIdObj = effectId.ToEffectId(); + if (storedEffects.TryGetValue(effectIdObj, out var storedEffect)) + return await GetValueFromStoredEffect(storedEffect, effectId); + + // Then try to find by alias + storedEffect = storedEffects.Values.FirstOrDefault(e => e.Alias == effectId); + if (storedEffect == null) + throw new KeyNotFoundException($"Effect with ID or alias '{effectId}' was not found"); + + return await GetValueFromStoredEffect(storedEffect, effectId); + } - public async Task GetValue(string effectId) => await GetValue(effectId.ToEffectId()); public async Task GetValue(EffectId effectId) { var storedEffects = await GetStoredEffects(); var success = storedEffects.TryGetValue(effectId, out var storedEffect); if (!success) throw new KeyNotFoundException($"Effect '{effectId}' was not found"); - if (storedEffect!.WorkStatus != WorkStatus.Completed) - throw new InvalidOperationException($"Effect '{effectId}' has not completed (but has status '{storedEffect.WorkStatus}')"); - return storedEffect.Result == null - ? default - : serializer.Deserialize(storedEffects[effectId].Result!); + return await GetValueFromStoredEffect(storedEffect!, effectId.ToString()); + } + + private Task GetValueFromStoredEffect(StoredEffect storedEffect, string identifier) + { + if (storedEffect.WorkStatus != WorkStatus.Completed) + throw new InvalidOperationException($"Effect '{identifier}' has not completed (but has status '{storedEffect.WorkStatus}')"); + + return Task.FromResult(storedEffect.Result == null + ? default + : serializer.Deserialize(storedEffect.Result)); + } + + public async Task GetResultBytes(string effectId) + { + var storedEffects = await GetStoredEffects(); + // First try to find by ID + var effectIdObj = effectId.ToEffectId(); + if (storedEffects.TryGetValue(effectIdObj, out var storedEffect)) + return storedEffect.Result; + + // Then try to find by alias + storedEffect = storedEffects.Values.FirstOrDefault(e => e.Alias == effectId); + if (storedEffect == null) + throw new KeyNotFoundException($"Effect with ID or alias '{effectId}' was not found"); + + return storedEffect.Result; } - public async Task GetResultBytes(string effectId) => await GetResultBytes(effectId.ToEffectId()); public async Task GetResultBytes(EffectId effectId) { var storedEffects = await GetStoredEffects(); @@ -60,7 +104,25 @@ public async Task RemoveFailed() await Remove(effectId); } - public Task Remove(string effectId) => Remove(effectId.ToEffectId()); + public async Task Remove(string effectId) + { + var storedEffects = await GetStoredEffects(); + // First try to find by ID + var effectIdObj = effectId.ToEffectId(); + if (storedEffects.ContainsKey(effectIdObj)) + { + await Remove(effectIdObj); + return; + } + + // Then try to find by alias + var storedEffect = storedEffects.FirstOrDefault(e => e.Value.Alias == effectId); + if (storedEffect.Value == null) + throw new KeyNotFoundException($"Effect with ID or alias '{effectId}' was not found"); + + await Remove(storedEffect.Key); + } + public async Task Remove(EffectId effectId) { var storedEffects = await GetStoredEffects(); @@ -79,22 +141,63 @@ private async Task Set(StoredEffect storedEffect) storedEffects[storedEffect.EffectId] = storedEffect; } - public Task SetValue(string effectId, TValue value) => SetValue(effectId.ToEffectId(), value); + public async Task SetValue(string effectId, TValue value) + { + var effectIdObj = await GetEffectIdByIdOrAlias(effectId); + await SetValue(effectIdObj, value); + } + public Task SetValue(EffectId effectId, TValue value) => SetSucceeded(effectId, value); - public Task SetStarted(string effectId) => SetStarted(effectId.ToEffectId()); + public async Task SetStarted(string effectId) + { + var effectIdObj = await GetEffectIdByIdOrAlias(effectId); + await SetStarted(effectIdObj); + } + public Task SetStarted(EffectId effectId) - => Set(new StoredEffect(effectId, WorkStatus.Started, Result: null, StoredException: null)); + => Set(new StoredEffect(effectId, WorkStatus.Started, Result: null, StoredException: null, Alias: null)); + + public async Task SetSucceeded(string effectId) + { + var effectIdObj = await GetEffectIdByIdOrAlias(effectId); + await SetSucceeded(effectIdObj); + } - public Task SetSucceeded(string effectId) => SetSucceeded(effectId.ToEffectId()); public Task SetSucceeded(EffectId effectId) - => Set(new StoredEffect(effectId, WorkStatus.Completed, Result: null, StoredException: null)); + => Set(new StoredEffect(effectId, WorkStatus.Completed, Result: null, StoredException: null, Alias: null)); + + public async Task SetSucceeded(string effectId, TResult result) + { + var effectIdObj = await GetEffectIdByIdOrAlias(effectId); + await SetSucceeded(effectIdObj, result); + } - public Task SetSucceeded(string effectId, TResult result) => SetSucceeded(effectId.ToEffectId(), result); public Task SetSucceeded(EffectId effectId, TResult result) - => Set(new StoredEffect(effectId, WorkStatus.Completed, Result: serializer.Serialize(result), StoredException: null)); + => Set(new StoredEffect(effectId, WorkStatus.Completed, Result: serializer.Serialize(result), StoredException: null, Alias: null)); + + public async Task SetFailed(string effectId, Exception exception) + { + var effectIdObj = await GetEffectIdByIdOrAlias(effectId); + await SetFailed(effectIdObj, exception); + } - public Task SetFailed(string effectId, Exception exception) => SetFailed(effectId.ToEffectId(), exception); public Task SetFailed(EffectId effectId, Exception exception) - => Set(new StoredEffect(effectId, WorkStatus.Failed, Result: null, StoredException: serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, exception)))); + => Set(new StoredEffect(effectId, WorkStatus.Failed, Result: null, StoredException: serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, exception)), Alias: null)); + + private async Task GetEffectIdByIdOrAlias(string effectId) + { + var storedEffects = await GetStoredEffects(); + // First try to find by ID + var effectIdObj = effectId.ToEffectId(); + if (storedEffects.ContainsKey(effectIdObj)) + return effectIdObj; + + // Then try to find by alias + var storedEffect = storedEffects.FirstOrDefault(e => e.Value.Alias == effectId); + if (storedEffect.Value == null) + throw new KeyNotFoundException($"Effect with ID or alias '{effectId}' was not found"); + + return storedEffect.Key; + } } \ No newline at end of file From 8dcb6551b33edcd9d7148ab61794ec1d876caf56 Mon Sep 17 00:00:00 2001 From: Thomas Stidsborg Sylvest Date: Mon, 17 Nov 2025 10:55:22 +0100 Subject: [PATCH 4/7] wip --- .../Domain/EffectResults.cs | 32 +++++++-- .../Domain/ExistingEffects.cs | 69 +++++++++++++++---- 2 files changed, 85 insertions(+), 16 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs index 04d8d0471..ad9d985ff 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs @@ -176,12 +176,24 @@ public async Task InnerCapture(string id, string? alias, EffectType effectType, { var success = _effectResults.TryGetValue(effectId, out var pendingChange); var storedEffect = pendingChange?.StoredEffect; + + // If not found by ID but alias is provided, check by alias + if (!success && alias != null) + { + var effectByAlias = _effectResults.FirstOrDefault(e => e.Value.StoredEffect?.Alias == alias); + if (effectByAlias.Value != null) + { + success = true; + storedEffect = effectByAlias.Value.StoredEffect; + } + } + if (success && storedEffect?.WorkStatus == WorkStatus.Completed) return; if (success && storedEffect?.WorkStatus == WorkStatus.Failed) throw serializer.DeserializeException(flowId, storedEffect.StoredException!); if (success && resiliency == ResiliencyLevel.AtMostOnce) - throw new InvalidOperationException($"Effect '{id}' started but did not complete previously"); + throw new InvalidOperationException($"Effect '{alias ?? id}' started but did not complete previously"); } if (resiliency == ResiliencyLevel.AtMostOnce) @@ -241,19 +253,31 @@ await FlushOrAddToPending( public async Task InnerCapture(string id, string? alias, EffectType effectType, Func> work, ResiliencyLevel resiliency, EffectContext effectContext) { await InitializeIfRequired(); - + var effectId = id.ToEffectId(effectType, context: effectContext.Parent?.Serialize().Value); EffectContext.SetParent(effectId); - + lock (_sync) { var success = _effectResults.TryGetValue(effectId, out var storedEffect); + + // If not found by ID but alias is provided, check by alias + if (!success && alias != null) + { + var effectByAlias = _effectResults.FirstOrDefault(e => e.Value.StoredEffect?.Alias == alias); + if (effectByAlias.Value != null) + { + success = true; + storedEffect = effectByAlias.Value; + } + } + if (success && storedEffect!.StoredEffect?.WorkStatus == WorkStatus.Completed) return (storedEffect.StoredEffect?.Result == null ? default : serializer.Deserialize(storedEffect.StoredEffect?.Result!))!; if (success && storedEffect!.StoredEffect?.WorkStatus == WorkStatus.Failed) throw FatalWorkflowException.Create(flowId, storedEffect.StoredEffect?.StoredException!); if (success && resiliency == ResiliencyLevel.AtMostOnce) - throw new InvalidOperationException($"Effect '{id}' started but did not complete previously"); + throw new InvalidOperationException($"Effect '{alias ?? id}' started but did not complete previously"); } if (resiliency == ResiliencyLevel.AtMostOnce) diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs b/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs index aead58f33..36d60ac20 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs @@ -143,16 +143,34 @@ private async Task Set(StoredEffect storedEffect) public async Task SetValue(string effectId, TValue value) { - var effectIdObj = await GetEffectIdByIdOrAlias(effectId); - await SetValue(effectIdObj, value); + var existingEffectId = await TryGetEffectIdByIdOrAlias(effectId); + if (existingEffectId != null) + { + await SetValue(existingEffectId, value); + } + else + { + // Create new effect with effectId as both the ID and alias + var effectIdObj = effectId.ToEffectId(); + await Set(new StoredEffect(effectIdObj, WorkStatus.Completed, Result: serializer.Serialize(value), StoredException: null, Alias: effectId)); + } } public Task SetValue(EffectId effectId, TValue value) => SetSucceeded(effectId, value); public async Task SetStarted(string effectId) { - var effectIdObj = await GetEffectIdByIdOrAlias(effectId); - await SetStarted(effectIdObj); + var existingEffectId = await TryGetEffectIdByIdOrAlias(effectId); + if (existingEffectId != null) + { + await SetStarted(existingEffectId); + } + else + { + // Create new effect with effectId as both the ID and alias + var effectIdObj = effectId.ToEffectId(); + await Set(new StoredEffect(effectIdObj, WorkStatus.Started, Result: null, StoredException: null, Alias: effectId)); + } } public Task SetStarted(EffectId effectId) @@ -160,8 +178,17 @@ public Task SetStarted(EffectId effectId) public async Task SetSucceeded(string effectId) { - var effectIdObj = await GetEffectIdByIdOrAlias(effectId); - await SetSucceeded(effectIdObj); + var existingEffectId = await TryGetEffectIdByIdOrAlias(effectId); + if (existingEffectId != null) + { + await SetSucceeded(existingEffectId); + } + else + { + // Create new effect with effectId as both the ID and alias + var effectIdObj = effectId.ToEffectId(); + await Set(new StoredEffect(effectIdObj, WorkStatus.Completed, Result: null, StoredException: null, Alias: effectId)); + } } public Task SetSucceeded(EffectId effectId) @@ -169,8 +196,17 @@ public Task SetSucceeded(EffectId effectId) public async Task SetSucceeded(string effectId, TResult result) { - var effectIdObj = await GetEffectIdByIdOrAlias(effectId); - await SetSucceeded(effectIdObj, result); + var existingEffectId = await TryGetEffectIdByIdOrAlias(effectId); + if (existingEffectId != null) + { + await SetSucceeded(existingEffectId, result); + } + else + { + // Create new effect with effectId as both the ID and alias + var effectIdObj = effectId.ToEffectId(); + await Set(new StoredEffect(effectIdObj, WorkStatus.Completed, Result: serializer.Serialize(result), StoredException: null, Alias: effectId)); + } } public Task SetSucceeded(EffectId effectId, TResult result) @@ -178,14 +214,23 @@ public Task SetSucceeded(EffectId effectId, TResult result) public async Task SetFailed(string effectId, Exception exception) { - var effectIdObj = await GetEffectIdByIdOrAlias(effectId); - await SetFailed(effectIdObj, exception); + var existingEffectId = await TryGetEffectIdByIdOrAlias(effectId); + if (existingEffectId != null) + { + await SetFailed(existingEffectId, exception); + } + else + { + // Create new effect with effectId as both the ID and alias + var effectIdObj = effectId.ToEffectId(); + await Set(new StoredEffect(effectIdObj, WorkStatus.Failed, Result: null, StoredException: serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, exception)), Alias: effectId)); + } } public Task SetFailed(EffectId effectId, Exception exception) => Set(new StoredEffect(effectId, WorkStatus.Failed, Result: null, StoredException: serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, exception)), Alias: null)); - private async Task GetEffectIdByIdOrAlias(string effectId) + private async Task TryGetEffectIdByIdOrAlias(string effectId) { var storedEffects = await GetStoredEffects(); // First try to find by ID @@ -196,7 +241,7 @@ private async Task GetEffectIdByIdOrAlias(string effectId) // Then try to find by alias var storedEffect = storedEffects.FirstOrDefault(e => e.Value.Alias == effectId); if (storedEffect.Value == null) - throw new KeyNotFoundException($"Effect with ID or alias '{effectId}' was not found"); + return null; return storedEffect.Key; } From 7e7fff5c36b9877ebc623bd90975eea1454c7dc3 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Mon, 17 Nov 2025 18:33:08 +0100 Subject: [PATCH 5/7] Fixed Effect's WhenAny & WhenAll methods --- Core/Cleipnir.ResilientFunctions/Domain/Effect.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs index 01c0c764b..13494a5cd 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs @@ -247,15 +247,15 @@ public async Task Clear(string id) } } - public Task WhenAny(string id, params Task[] tasks) - => Capture(id, work: async () => await await Task.WhenAny(tasks)); - public Task WhenAll(string id, params Task[] tasks) - => Capture(id, work: () => Task.WhenAll(tasks)); + public Task WhenAny(string alias, params Task[] tasks) + => Capture(alias, work: async () => await await Task.WhenAny(tasks)); + public Task WhenAll(string alias, params Task[] tasks) + => Capture(alias, work: () => Task.WhenAll(tasks)); public Task WhenAny(params Task[] tasks) - => WhenAny(EffectContext.CurrentContext.NextImplicitId(), tasks); + => Capture(work: async () => await await Task.WhenAny(tasks)); public Task WhenAll(params Task[] tasks) - => WhenAll(EffectContext.CurrentContext.NextImplicitId(), tasks); + => Capture(work: async () => await Task.WhenAll(tasks)); internal string TakeNextImplicitId() => EffectContext.CurrentContext.NextImplicitId(); From 4519bf3586bc930164c1c22e08c34a7f63af96fc Mon Sep 17 00:00:00 2001 From: stidsborg Date: Mon, 17 Nov 2025 18:33:47 +0100 Subject: [PATCH 6/7] Removed FloowRegisteredTimeouts's PendingTimeouts method --- .../CoreRuntime/FlowRegisteredTimeouts.cs | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowRegisteredTimeouts.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowRegisteredTimeouts.cs index b2524b2c5..ba7cad2f0 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowRegisteredTimeouts.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowRegisteredTimeouts.cs @@ -16,7 +16,6 @@ public interface IRegisteredTimeouts : IDisposable Task> RegisterTimeout(EffectId timeoutId, TimeSpan expiresIn, bool publishMessage); Task CancelTimeout(EffectId timeoutId); Task CompleteTimeout(EffectId timeoutId); - Task> PendingTimeouts(); } public enum TimeoutStatus @@ -107,27 +106,6 @@ public async Task CompleteTimeout(EffectId timeoutId) flowMinimumTimeout.RemoveTimeout(timeoutId); } - public Task> PendingTimeouts() => PendingTimeouts(effect); - - public static async Task> PendingTimeouts(Effect effect) - { - var timeoutIds = effect.EffectIds.Where(id => id.Type == EffectType.Timeout); - var timeouts = new List(); - foreach (var timeoutId in timeoutIds) - { - var value = await effect.Get(timeoutId); - var values = value.Split("_"); - var status = values[0].ToInt().ToEnum(); - if (status is TimeoutStatus.Cancelled or TimeoutStatus.Completed) - continue; - - var expiresAt = values[1].ToLong().ToUtc(); - timeouts.Add(new RegisteredTimeout(timeoutId, expiresAt, TimeoutStatus.Registered)); - } - - return timeouts; - } - public void Dispose() => _disposed = true; private void RegisterTimeoutEventPublish(EffectId timeoutId, DateTime expiresAt) From 19f9584fbf7f5b878c68f04b9c429bed9bd1de30 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Mon, 17 Nov 2025 19:07:33 +0100 Subject: [PATCH 7/7] wip --- .../FunctionTests/EffectTests.cs | 50 ++++++------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs index 574a612ad..fcf0abf0d 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs @@ -662,43 +662,23 @@ await effectStore public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task storeTask) { var store = await storeTask; - var storedId = TestStoredId.Create(); - var session = await store.CreateFunction( - storedId, - "SomeInstance", - param: null, - leaseExpiration: 0, - postponeUntil: null, - timestamp: 0, - parent: null, - owner: ReplicaId.NewId() - ); - - var effectStore = store.EffectsStore; - var effectResults = new EffectResults( - TestFlowId.Create(), - storedId, - lazyExistingEffects: new Lazy>>( - () => new List().CastTo>().ToTask() - ), - effectStore, - DefaultSerializer.Instance, - session - ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowMinimumTimeout()); - - var result = await effect.Capture("1", () => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush); - result.ShouldBe("hello world"); + var id = TestFlowId.Create(); - await effectStore.GetEffectResults(storedId).ShouldBeEmptyAsync(); + using var registry = new FunctionsRegistry(store); + var afterCaptureFlag = new SyncedFlag(); + var completeFunction = new SyncedFlag(); + var registration = registry.RegisterParamless( + id.Type, + inner: async workflow => + { + await workflow.Effect.Capture("Alias", () => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush); + afterCaptureFlag.Raise(); + await completeFunction.WaitForRaised(); + }); + + registration.Schedule(storedId.) + registration.ControlPanel() - await effect.Capture("2", () => "hello universe"); - - var storedEffects = await effectStore.GetEffectResults(storedId); - storedEffects.Count.ShouldBe(2); - // "1" and "2" are now aliases - storedEffects.Single(se => se.Alias == "1").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello world"); - storedEffects.Single(se => se.Alias == "2").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo().ShouldBe("hello universe"); } public abstract Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow();