diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectIdTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectIdTests.cs index 41f03550c..16ef8afd5 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectIdTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectIdTests.cs @@ -11,82 +11,82 @@ public class EffectIdTests [TestMethod] public void EffectIdWithStateCanBeDeserialized() { - var effectId = new EffectId("SomeValue", EffectType.State, Context: ""); + var effectId = new EffectId("SomeValue", ""); var serializedId = effectId.Serialize(); var deserializedId = EffectId.Deserialize(serializedId); - + deserializedId.ShouldBe(effectId); } - + [TestMethod] public void EffectIdWithContextCanBeDeserialized() { - var parentEffect = new EffectId("SomeParentId", EffectType.Effect, Context: "ESomeParentContext"); - var effectId = new EffectId("SomeValue", EffectType.State, Context: parentEffect.Serialize().Value); + var parentEffect = new EffectId("SomeParentId", "ESomeParentContext"); + var effectId = new EffectId("SomeValue", parentEffect.Serialize().Value); var serializedId = effectId.Serialize(); var deserializedId = EffectId.Deserialize(serializedId); - + deserializedId.ShouldBe(effectId); } - + [TestMethod] public void EffectIdWithContextAndEscapedCharactersCanBeDeserialized() { - var parentEffect = new EffectId("SomeParentId", EffectType.Effect, Context: ""); - var effectId = new EffectId("Some.Value\\WithBackSlash", EffectType.State, Context: parentEffect.Serialize().Value); + var parentEffect = new EffectId("SomeParentId", ""); + var effectId = new EffectId("Some.Value\\WithBackSlash", parentEffect.Serialize().Value); var serializedId = effectId.Serialize(); var deserializedId = EffectId.Deserialize(serializedId); - + deserializedId.ShouldBe(effectId); } - + [TestMethod] public void EffectIdWithBackslashIsSerializedCorrectly() { - var effectId = new EffectId("\\", EffectType.State, Context: ""); + var effectId = new EffectId("\\", ""); var serializedId = effectId.Serialize(); - serializedId.Value.ShouldBe("S\\\\"); + serializedId.Value.ShouldBe("E\\\\"); var deserializedId = EffectId.Deserialize(serializedId); deserializedId.ShouldBe(effectId); } - + [TestMethod] public void EffectIdWithDotIsSerializedCorrectly() { - var effectId = new EffectId(".", EffectType.State, Context: ""); + var effectId = new EffectId(".", ""); var serializedId = effectId.Serialize(); - serializedId.Value.ShouldBe("S\\."); + serializedId.Value.ShouldBe("E\\."); var deserializedId = EffectId.Deserialize(serializedId); deserializedId.ShouldBe(effectId); } - + [TestMethod] public void EffectIdWithoutStateCanBeDeserialized() { - var effectId = new EffectId("SomeValue", EffectType.Effect, Context: ""); + var effectId = new EffectId("SomeValue", ""); var serializedId = effectId.Serialize(); var deserializedId = EffectId.Deserialize(serializedId); - + deserializedId.ShouldBe(effectId); } [TestMethod] public void StoredEffectIdIsBasedOnSerializedEffectIdValue() { - var effectId = new EffectId("SomeId", EffectType.Effect, Context: new EffectId("SomeParentId", EffectType.Effect, Context: "ESomeParentContext").Serialize().Value); + var effectId = new EffectId("SomeId", new EffectId("SomeParentId", "ESomeParentContext").Serialize().Value); var serializedEffectId = effectId.Serialize(); var storedEffectId = effectId.ToStoredEffectId(); storedEffectId.Value.ShouldBe(StoredIdFactory.FromString(serializedEffectId.Value)); } - + [TestMethod] public void EffectIdWithEmptyIdAndContextCanBeDeserialized() { - var effectId = new EffectId("", EffectType.State, Context: ""); + var effectId = new EffectId("", ""); var serializedId = effectId.Serialize(); var deserializedId = EffectId.Deserialize(serializedId); - + deserializedId.ShouldBe(effectId); } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoredEffectSerializationTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoredEffectSerializationTests.cs index 1bb30082c..949d7bba5 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoredEffectSerializationTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoredEffectSerializationTests.cs @@ -11,7 +11,7 @@ public class StoredEffectSerializationTests [TestMethod] public void CompletedStoredEffectWithResultCanBeSerializedAndDeserialized() { - var effectId = new EffectId("SomeEffect", EffectType.Effect, Context: ""); + var effectId = new EffectId("SomeEffect", ""); var result = "SomeResult"u8.ToArray(); var storedEffect = StoredEffect.CreateCompleted(effectId, result); @@ -27,7 +27,7 @@ public void CompletedStoredEffectWithResultCanBeSerializedAndDeserialized() [TestMethod] public void CompletedStoredEffectWithoutResultCanBeSerializedAndDeserialized() { - var effectId = new EffectId("SomeEffect", EffectType.Effect, Context: ""); + var effectId = new EffectId("SomeEffect", ""); var storedEffect = StoredEffect.CreateCompleted(effectId); var serialized = storedEffect.Serialize(); @@ -42,7 +42,7 @@ public void CompletedStoredEffectWithoutResultCanBeSerializedAndDeserialized() [TestMethod] public void StartedStoredEffectCanBeSerializedAndDeserialized() { - var effectId = new EffectId("SomeEffect", EffectType.Effect, Context: ""); + var effectId = new EffectId("SomeEffect", ""); var storedEffect = StoredEffect.CreateStarted(effectId); var serialized = storedEffect.Serialize(); @@ -57,7 +57,7 @@ public void StartedStoredEffectCanBeSerializedAndDeserialized() [TestMethod] public void FailedStoredEffectWithExceptionCanBeSerializedAndDeserialized() { - var effectId = new EffectId("SomeEffect", EffectType.Effect, Context: ""); + var effectId = new EffectId("SomeEffect", ""); var storedException = new StoredException( ExceptionMessage: "Something went wrong", ExceptionStackTrace: "at SomeMethod() in SomeFile.cs:line 42", @@ -77,27 +77,11 @@ public void FailedStoredEffectWithExceptionCanBeSerializedAndDeserialized() deserialized.StoredException.ExceptionType.ShouldBe("System.InvalidOperationException"); } - [TestMethod] - public void StoredEffectWithStateTypeCanBeSerializedAndDeserialized() - { - var effectId = new EffectId("SomeState", EffectType.State, Context: ""); - var result = "{\"key\":\"value\"}"u8.ToArray(); - var storedEffect = StoredEffect.CreateCompleted(effectId, result); - - var serialized = storedEffect.Serialize(); - var deserialized = StoredEffect.Deserialize(serialized); - - deserialized.EffectId.ShouldBe(effectId); - deserialized.EffectId.Type.ShouldBe(EffectType.State); - deserialized.WorkStatus.ShouldBe(WorkStatus.Completed); - deserialized.Result.ShouldBe(result); - } - [TestMethod] public void StoredEffectWithContextCanBeSerializedAndDeserialized() { - var parentEffect = new EffectId("ParentEffect", EffectType.Effect, Context: ""); - var effectId = new EffectId("ChildEffect", EffectType.Effect, Context: parentEffect.Serialize().Value); + var parentEffect = new EffectId("ParentEffect", ""); + var effectId = new EffectId("ChildEffect", parentEffect.Serialize().Value); var result = "SomeData"u8.ToArray(); var storedEffect = StoredEffect.CreateCompleted(effectId, result); @@ -110,52 +94,10 @@ public void StoredEffectWithContextCanBeSerializedAndDeserialized() deserialized.Result.ShouldBe(result); } - [TestMethod] - public void StoredEffectWithTimeoutTypeCanBeSerializedAndDeserialized() - { - var effectId = new EffectId("SomeTimeout", EffectType.Timeout, Context: ""); - var storedEffect = StoredEffect.CreateStarted(effectId); - - var serialized = storedEffect.Serialize(); - var deserialized = StoredEffect.Deserialize(serialized); - - deserialized.EffectId.ShouldBe(effectId); - deserialized.EffectId.Type.ShouldBe(EffectType.Timeout); - deserialized.WorkStatus.ShouldBe(WorkStatus.Started); - } - - [TestMethod] - public void StoredEffectWithRetryTypeCanBeSerializedAndDeserialized() - { - var effectId = new EffectId("SomeRetry", EffectType.Retry, Context: ""); - var storedEffect = StoredEffect.CreateCompleted(effectId); - - var serialized = storedEffect.Serialize(); - var deserialized = StoredEffect.Deserialize(serialized); - - deserialized.EffectId.ShouldBe(effectId); - deserialized.EffectId.Type.ShouldBe(EffectType.Retry); - deserialized.WorkStatus.ShouldBe(WorkStatus.Completed); - } - - [TestMethod] - public void StoredEffectWithSystemTypeCanBeSerializedAndDeserialized() - { - var effectId = new EffectId("SomeSystem", EffectType.System, Context: ""); - var storedEffect = StoredEffect.CreateCompleted(effectId); - - var serialized = storedEffect.Serialize(); - var deserialized = StoredEffect.Deserialize(serialized); - - deserialized.EffectId.ShouldBe(effectId); - deserialized.EffectId.Type.ShouldBe(EffectType.System); - deserialized.WorkStatus.ShouldBe(WorkStatus.Completed); - } - [TestMethod] public void StoredEffectWithLargeResultCanBeSerializedAndDeserialized() { - var effectId = new EffectId("LargeEffect", EffectType.Effect, Context: ""); + var effectId = new EffectId("LargeEffect", ""); var largeResult = new byte[10000]; for (int i = 0; i < largeResult.Length; i++) largeResult[i] = (byte)(i % 256); @@ -173,7 +115,7 @@ public void StoredEffectWithLargeResultCanBeSerializedAndDeserialized() [TestMethod] public void StoredEffectWithSpecialCharactersInIdCanBeSerializedAndDeserialized() { - var effectId = new EffectId("Effect.With\\Special.Characters", EffectType.Effect, Context: ""); + var effectId = new EffectId("Effect.With\\Special.Characters", ""); var result = "Data"u8.ToArray(); var storedEffect = StoredEffect.CreateCompleted(effectId, result); @@ -189,7 +131,7 @@ public void StoredEffectWithSpecialCharactersInIdCanBeSerializedAndDeserialized( [TestMethod] public void StoredEffectWithEmptyIdCanBeSerializedAndDeserialized() { - var effectId = new EffectId("", EffectType.Effect, Context: ""); + var effectId = new EffectId("", ""); var storedEffect = StoredEffect.CreateCompleted(effectId); var serialized = storedEffect.Serialize(); @@ -203,7 +145,7 @@ public void StoredEffectWithEmptyIdCanBeSerializedAndDeserialized() [TestMethod] public void StoredEffectWithNullStackTraceCanBeSerializedAndDeserialized() { - var effectId = new EffectId("FailedEffect", EffectType.Effect, Context: ""); + var effectId = new EffectId("FailedEffect", ""); var storedException = new StoredException( ExceptionMessage: "Error occurred", ExceptionStackTrace: null, @@ -221,4 +163,4 @@ public void StoredEffectWithNullStackTraceCanBeSerializedAndDeserialized() deserialized.StoredException.ExceptionStackTrace.ShouldBeNull(); deserialized.StoredException.ExceptionType.ShouldBe("System.Exception"); } -} \ No newline at end of file +} diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs index 281456894..48e341b83 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs @@ -1154,7 +1154,7 @@ protected async Task EffectsAreUpdatedAfterRefresh(Task storeTas await secondControlPanel.Refresh(); await secondControlPanel.Effects.GetValue("Id").ShouldBeAsync("SomeResult"); - await secondControlPanel.Effects.GetStatus("Id".ToEffectId(EffectType.Effect)).ShouldBeAsync(WorkStatus.Completed); + await secondControlPanel.Effects.GetStatus("Id".ToEffectId()).ShouldBeAsync(WorkStatus.Completed); unhandledExceptionCatcher.ShouldNotHaveExceptions(); } @@ -1429,7 +1429,7 @@ await workflow.Effect.Capture("AlwaysFail", () => try { - var timeoutEvent = new TimeoutEvent(EffectId.CreateWithRootContext("SomeTimeout", EffectType.Timeout), DateTime.UtcNow) + var timeoutEvent = new TimeoutEvent(EffectId.CreateWithRootContext("SomeTimeout"), DateTime.UtcNow) .ToMessageAndIdempotencyKey(); await registration.Invoke( diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs index dd06c57c1..7ba94b86b 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs @@ -621,12 +621,10 @@ public async Task DelayedFlushIsReflectedInUnderlyingStoreForSet(Task r.Count == 0) .ShouldBeTrueAsync(); - var effectId2 = new EffectId("Id2", EffectType.Effect, Context: ""); - var storedEffect2 = new StoredEffect( + var effectId2 = new EffectId("Id2", ""); + var storedEffect2 = StoredEffect.CreateCompleted( effectId2, - WorkStatus.Completed, - Result: "hello universe".ToUtf8Bytes(), - StoredException: null + "hello universe".ToUtf8Bytes() ); await effectResults.Set(storedEffect2, flush: true); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/TimeoutTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/TimeoutTests.cs index 08dad3971..cc390dc72 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/TimeoutTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/TimeoutTests.cs @@ -162,7 +162,7 @@ await controlPanel.BusyWaitUntil(cp => var id = registeredTimeout.TimeoutId; id.Id.ShouldBe("TimeoutId4321"); - id.Type.ShouldBe(EffectType.Timeout); + // Type is Timeout (guaranteed by RegisteredTimeouts API) await controlPanel.RegisteredTimeouts.Remove(id); @@ -208,7 +208,7 @@ protected async Task PendingTimeoutCanBeUpdatedFromControlPanel(Task ); session.ShouldBeNull(); - await effectsStore.SetEffectResult(functionId, new StoredEffect("".ToEffectId(EffectType.State), WorkStatus.Completed, "some default state".ToUtf8Bytes(), StoredException: null).ToStoredChange(functionId, Insert), session: null); + await effectsStore.SetEffectResult(functionId, StoredEffect.CreateCompleted("".ToEffectId(), "some default state".ToUtf8Bytes()).ToStoredChange(functionId, Insert), session: null); var storedEffects = await effectsStore.GetEffectResults(functionId); storedEffects.Count.ShouldBe(1); - storedEffects.Single().EffectId.ShouldBe("".ToEffectId(EffectType.State)); + storedEffects.Single().EffectId.ShouldBe("".ToEffectId()); storedEffects.Single().Result.ShouldBe("some default state".ToUtf8Bytes()); } @@ -1387,19 +1387,15 @@ protected async Task FunctionCanBeCreatedWithMessagesAndEffects(Task r.EffectId.Id == "SomeEffect1"); - effectResult1.EffectId.ShouldBe(new EffectId("SomeEffect1", EffectType.State, Context: "")); + effectResult1.EffectId.ShouldBe(new EffectId("SomeEffect1", "")); effectResult1.Result!.ToStringFromUtf8Bytes().ShouldBe("hello world"); var effectResult2 = effectResults.Single(r => r.EffectId.Id == "SomeEffect2"); - effectResult2.EffectId.ShouldBe(new EffectId("SomeEffect2", EffectType.State, Context: "")); + effectResult2.EffectId.ShouldBe(new EffectId("SomeEffect2", "")); effectResult2.Result!.ToStringFromUtf8Bytes().ShouldBe("hello universe"); var messages = await store.MessageStore.GetMessages(storedId, skip: 0); @@ -1538,19 +1534,15 @@ protected async Task FunctionCanBeCreatedWithEffectsOnly(Task st var store = await storeTask; var paramJson = PARAM.ToJson(); - var effectId1 = new EffectId("SomeEffect1", EffectType.State, Context: ""); - var effect1 = new StoredEffect( + var effectId1 = new EffectId("SomeEffect1", ""); + var effect1 = StoredEffect.CreateCompleted( effectId1, - WorkStatus.Completed, - Result: "hello world".ToUtf8Bytes(), - StoredException: null + "hello world".ToUtf8Bytes() ); - var effectId2 = new EffectId("SomeEffect2", EffectType.State, Context: ""); - var effect2 = new StoredEffect( + var effectId2 = new EffectId("SomeEffect2", ""); + var effect2 = StoredEffect.CreateCompleted( effectId2, - WorkStatus.Completed, - Result: "hello universe".ToUtf8Bytes(), - StoredException: null + "hello universe".ToUtf8Bytes() ); var session = await store.CreateFunction( @@ -1570,10 +1562,10 @@ protected async Task FunctionCanBeCreatedWithEffectsOnly(Task st var effectResults = await store.EffectsStore.GetEffectResults(storedId); effectResults.Count.ShouldBe(2); var effectResult1 = effectResults.Single(r => r.EffectId.Id == "SomeEffect1"); - effectResult1.EffectId.ShouldBe(new EffectId("SomeEffect1", EffectType.State, Context: "")); + effectResult1.EffectId.ShouldBe(new EffectId("SomeEffect1", "")); effectResult1.Result!.ToStringFromUtf8Bytes().ShouldBe("hello world"); var effectResult2 = effectResults.Single(r => r.EffectId.Id == "SomeEffect2"); - effectResult2.EffectId.ShouldBe(new EffectId("SomeEffect2", EffectType.State, Context: "")); + effectResult2.EffectId.ShouldBe(new EffectId("SomeEffect2", "")); effectResult2.Result!.ToStringFromUtf8Bytes().ShouldBe("hello universe"); var messages = await store.MessageStore.GetMessages(storedId, skip: 0); diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowRegisteredTimeouts.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowRegisteredTimeouts.cs index b2524b2c5..2b1adc714 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowRegisteredTimeouts.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowRegisteredTimeouts.cs @@ -67,9 +67,9 @@ public async Task> RegisterTimeout(EffectId timeo } public Task> RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool publishMessage) - => RegisterTimeout(EffectId.CreateWithCurrentContext(timeoutId, EffectType.Timeout), expiresAt: utcNow().Add(expiresIn), publishMessage); + => RegisterTimeout(EffectId.CreateWithCurrentContext(timeoutId), expiresAt: utcNow().Add(expiresIn), publishMessage); public Task> RegisterTimeout(string timeoutId, DateTime expiresAt, bool publishMessage) - => RegisterTimeout(EffectId.CreateWithCurrentContext(timeoutId, EffectType.Timeout), expiresAt, publishMessage); + => RegisterTimeout(EffectId.CreateWithCurrentContext(timeoutId), expiresAt, publishMessage); public Task> RegisterTimeout(EffectId timeoutId, TimeSpan expiresIn, bool publishMessage) => RegisterTimeout(timeoutId, expiresAt: utcNow().Add(expiresIn), publishMessage); @@ -111,18 +111,31 @@ public async Task CompleteTimeout(EffectId timeoutId) public static async Task> PendingTimeouts(Effect effect) { - var timeoutIds = effect.EffectIds.Where(id => id.Type == EffectType.Timeout); + var timeoutIds = effect.EffectIds; var timeouts = new List(); foreach (var timeoutId in timeoutIds) { - var value = await effect.Get(timeoutId); + var valueOption = await effect.TryGet(timeoutId); + if (!valueOption.HasValue) + continue; + + var value = valueOption.Value; + if (!value.Contains('_')) + continue; // Not a timeout effect + var values = value.Split("_"); - var status = values[0].ToInt().ToEnum(); + if (values.Length != 2) + continue; // Not a timeout effect + + if (!int.TryParse(values[0], out var statusInt) || !long.TryParse(values[1], out var ticks)) + continue; // Not a timeout effect + + var status = statusInt.ToEnum(); if (status is TimeoutStatus.Cancelled or TimeoutStatus.Completed) continue; - - var expiresAt = values[1].ToLong().ToUtc(); - timeouts.Add(new RegisteredTimeout(timeoutId, expiresAt, TimeoutStatus.Registered)); + + var expiresAt = ticks.ToUtc(); + timeouts.Add(new RegisteredTimeout(timeoutId, expiresAt, TimeoutStatus.Registered)); } return timeouts; diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs index 0bebe2cdc..b187615ac 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs @@ -495,19 +495,25 @@ public InnerScheduled CreateInnerScheduled(List scheduledIds, W public IReadOnlyList MapInitialEffects(IEnumerable initialEffects, FlowId flowId) => initialEffects .Select(e => - e.Exception == null + { + var effectId = e.Id.ToEffectId(); + var serializedEffectId = effectId.Serialize(); + return e.Exception == null ? new StoredEffect( - e.Id.ToEffectId(EffectType.Effect), + effectId, e.Status ?? WorkStatus.Completed, Result: Serializer.Serialize(e.Value, e.Value?.GetType() ?? typeof(object)), - StoredException: null) + StoredException: null, + Alias: null, + SerializedEffectId: serializedEffectId) : new StoredEffect( - e.Id.ToEffectId(EffectType.Effect), + effectId, WorkStatus.Failed, Result: null, - StoredException: Serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, e.Exception)) - ) - ).ToList(); + StoredException: Serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, e.Exception)), + Alias: null, + SerializedEffectId: serializedEffectId); + }).ToList(); public IReadOnlyList MapInitialMessages(IEnumerable initialMessages) => initialMessages.Select(m => diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs index 7614e9141..a9fa6414a 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs @@ -44,7 +44,7 @@ public void Deconstruct(out Effect effect, out Messages messages) public async Task Delay(DateTime until, bool suspend = true, string? effectId = null) { effectId ??= $"Delay#{Effect.TakeNextImplicitId()}"; - var timeoutId = EffectId.CreateWithCurrentContext(effectId, EffectType.Timeout); + var timeoutId = EffectId.CreateWithCurrentContext(effectId); var (status, expiry) = await Messages.FlowRegisteredTimeouts.RegisterTimeout( timeoutId, diff --git a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs index 8301d2456..0e0d1a20b 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs @@ -17,7 +17,7 @@ public enum ResiliencyLevel public class Effect(EffectResults effectResults, UtcNow utcNow, FlowMinimumTimeout flowMinimumTimeout) { - public async Task Contains(string id) => await Contains(CreateEffectId(id, EffectType.Effect)); + public async Task Contains(string id) => await Contains(CreateEffectId(id)); internal Task Contains(EffectId effectId) => effectResults.Contains(effectId); internal IEnumerable EffectIds => effectResults.EffectIds; @@ -44,15 +44,15 @@ public async Task Mark(string id) public Task CreateOrGet(string id, T value, bool flush = true) => CreateOrGet(CreateEffectId(id), value, flush); internal Task CreateOrGet(EffectId effectId, T value, bool flush) => effectResults.CreateOrGet(effectId, value, flush); - public async Task Upsert(string id, T value, bool flush = true) => await Upsert(CreateEffectId(id, EffectType.Effect), value, flush); + public async Task Upsert(string id, T value, bool flush = true) => await Upsert(CreateEffectId(id), value, flush); internal Task Upsert(EffectId effectId, T value, bool flush) => effectResults.Upsert(effectId, value, flush); internal Task Upserts(IEnumerable> values, bool flush) => effectResults.Upserts(values, flush); - public async Task> TryGet(string id) => await TryGet(CreateEffectId(id, EffectType.Effect)); + public async Task> TryGet(string id) => await TryGet(CreateEffectId(id)); internal Task> TryGet(EffectId effectId) => effectResults.TryGet(effectId); - public async Task Get(string id) => await Get(CreateEffectId(id, EffectType.Effect)); + public async Task Get(string id) => await Get(CreateEffectId(id)); internal async Task Get(EffectId effectId) { var option = await TryGet(effectId); @@ -89,46 +89,46 @@ public Task Capture(string id, Action work, ResiliencyLevel resiliency = Resilie => Capture(id, work: () => { work(); return Task.CompletedTask; }, resiliency); public Task Capture(string id, Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) => Capture(id, work: () => work().ToTask(), resiliency); - public async Task Capture(string id, Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => await InnerCapture(id, EffectType.Effect, work, resiliency, EffectContext.CurrentContext, retryPolicy: null); - public async Task Capture(string id, Func> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => await InnerCapture(id, EffectType.Effect, work, resiliency, EffectContext.CurrentContext, retryPolicy: null); - + public async Task Capture(string id, Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) + => await InnerCapture(id, work, resiliency, EffectContext.CurrentContext, retryPolicy: null); + public async Task Capture(string id, Func> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) + => await InnerCapture(id, work, resiliency, EffectContext.CurrentContext, retryPolicy: null); + public Task Capture(string id, Action work, RetryPolicy retryPolicy, bool flush = true) => Capture(id, work: () => { work(); return Task.CompletedTask; }, retryPolicy, flush); public Task Capture(string id, Func work, RetryPolicy retryPolicy, bool flush = true) => Capture(id, work: () => work().ToTask(), retryPolicy, flush); - public async Task Capture(string id, Func work, RetryPolicy retryPolicy, bool flush = true) - => await InnerCapture(id, EffectType.Effect, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, EffectContext.CurrentContext, retryPolicy); - public async Task Capture(string id, Func> work, RetryPolicy retryPolicy, bool flush = true) - => await InnerCapture(id, EffectType.Effect, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, EffectContext.CurrentContext, retryPolicy); + public async Task Capture(string id, Func work, RetryPolicy retryPolicy, bool flush = true) + => await InnerCapture(id, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, EffectContext.CurrentContext, retryPolicy); + public async Task Capture(string id, Func> work, RetryPolicy retryPolicy, bool flush = true) + => await InnerCapture(id, work, flush ? ResiliencyLevel.AtLeastOnce : ResiliencyLevel.AtLeastOnceDelayFlush, EffectContext.CurrentContext, retryPolicy); - private async Task InnerCapture(string id, EffectType effectType, Func work, ResiliencyLevel resiliency, EffectContext effectContext, RetryPolicy? retryPolicy) + private async Task InnerCapture(string id, Func work, ResiliencyLevel resiliency, EffectContext effectContext, RetryPolicy? retryPolicy) { if (retryPolicy != null && resiliency == ResiliencyLevel.AtMostOnce) throw new InvalidOperationException("Retry policy cannot be used with AtMostOnce resiliency"); if (retryPolicy == null) - await effectResults.InnerCapture(id, effectType, work, resiliency, effectContext); + await effectResults.InnerCapture(id, work, resiliency, effectContext); else await effectResults.InnerCapture( - id, effectType, + id, work: () => retryPolicy.Invoke(work, effect: this, utcNow, flowMinimumTimeout), resiliency, effectContext ); } - - private async Task InnerCapture(string id, EffectType effectType, Func> work, ResiliencyLevel resiliency, EffectContext effectContext, RetryPolicy? retryPolicy) + + private async Task InnerCapture(string id, Func> work, ResiliencyLevel resiliency, EffectContext effectContext, RetryPolicy? retryPolicy) { if (retryPolicy != null && resiliency == ResiliencyLevel.AtMostOnce) throw new InvalidOperationException("Retry policy cannot be used with AtMostOnce resiliency"); - + if (retryPolicy == null) - return await effectResults.InnerCapture(id, effectType, work, resiliency, effectContext); + return await effectResults.InnerCapture(id, work, resiliency, effectContext); return await effectResults.InnerCapture( - id, effectType, + id, work: () => retryPolicy.Invoke(work, effect: this, utcNow, flowMinimumTimeout), resiliency, effectContext @@ -149,8 +149,8 @@ public Task WhenAll(params Task[] tasks) internal string TakeNextImplicitId() => EffectContext.CurrentContext.NextImplicitId(); - internal EffectId CreateEffectId(string id, EffectType? type = null) - => id.ToEffectId(type, context: EffectContext.CurrentContext.Parent?.Serialize().Value); + internal EffectId CreateEffectId(string id) + => id.ToEffectId(context: EffectContext.CurrentContext.Parent?.Serialize().Value); public Task Flush() => effectResults.Flush(); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectId.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectId.cs index 0b910e93d..eb6c9fbea 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectId.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/EffectId.cs @@ -3,18 +3,18 @@ namespace Cleipnir.ResilientFunctions.Domain; -public record EffectId(string Id, EffectType Type, string Context) +public record EffectId(string Id, string Context) { public SerializedEffectId Serialize() { - var (id, type, context) = this; + var (id, context) = this; if (id.All(c => c != '.' && c != '\\')) return new SerializedEffectId( context == "" - ? $"{(char)type}{id}" - : $"{context}.{(char)type}{id}" + ? $"E{id}" + : $"{context}.E{id}" ); - + var escapedIdList = new List(id.Length * 2); foreach (var idChar in id) switch (idChar) @@ -31,12 +31,12 @@ public SerializedEffectId Serialize() escapedIdList.Add(idChar); break; } - + var escapedId = new string(escapedIdList.ToArray()); return new SerializedEffectId( context == "" - ? $"{(char)type}{escapedId}" - : $"{context}.{(char)type}{escapedId}" + ? $"E{escapedId}" + : $"{context}.E{escapedId}" ); } @@ -50,29 +50,28 @@ public static EffectId Deserialize(string serialized) if (serialized[pos] == '.') pos++; - - var type = (EffectType) serialized[pos]; - var context = pos - 1 < 0 ? "" : serialized[..(pos - 1)]; + + var context = pos - 1 < 0 ? "" : serialized[..(pos - 1)]; var id = serialized[(pos + 1)..]; if (id.Any(c => c == '\\')) { id = id.Replace("\\.", ".").Replace("\\\\", "\\"); } - return new EffectId(id, type, context); + return new EffectId(id, context); } - public static EffectId CreateWithRootContext(string id, EffectType effectType = EffectType.Effect) - => new(id, effectType, Context: ""); - - public static EffectId CreateWithCurrentContext(string id, EffectType effectType) - => new(id, effectType, EffectContext.CurrentContext.Parent?.Serialize().Value ?? ""); + public static EffectId CreateWithRootContext(string id) + => new(id, Context: ""); + + public static EffectId CreateWithCurrentContext(string id) + => new(id, EffectContext.CurrentContext.Parent?.Serialize().Value ?? ""); } public record SerializedEffectId(string Value); public static class EffectIdExtensions { - public static EffectId ToEffectId(this string value, EffectType? effectType = null, string? context = null) => new(value, effectType ?? EffectType.Effect, context ?? ""); + public static EffectId ToEffectId(this string value, string? context = null) => new(value, context ?? ""); public static SerializedEffectId ToSerializedEffectId(this string value) => new(value); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs index def6ee014..42dc96316 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs @@ -23,14 +23,14 @@ public class EffectResults( private volatile bool _initialized; private readonly Dictionary _effectResults = new(); - + public IEnumerable EffectIds => _effectResults.Keys.ToList(); private async Task InitializeIfRequired() { if (_initialized) return; - + var existingEffects = await lazyExistingEffects.Value; lock (_sync) { @@ -38,6 +38,7 @@ private async Task InitializeIfRequired() return; foreach (var existingEffect in existingEffects) + { _effectResults[existingEffect.EffectId] = new PendingEffectChange( existingEffect.EffectId, @@ -45,7 +46,8 @@ private async Task InitializeIfRequired() Operation: null, Existing: true ); - + } + _initialized = true; } } @@ -82,7 +84,7 @@ public async Task CreateOrGet(EffectId effectId, T value, bool flush) { if (_effectResults.TryGetValue(effectId, out var existing) && existing.StoredEffect?.WorkStatus == WorkStatus.Completed) return serializer.Deserialize(existing.StoredEffect.Result!); - + if (existing?.StoredEffect?.StoredException != null) throw serializer.DeserializeException(flowId, existing.StoredEffect.StoredException!); } @@ -94,14 +96,15 @@ await FlushOrAddToPending( flush, delete: false ); - + return value; } - + + internal async Task Upsert(EffectId effectId, T value, bool flush) { await InitializeIfRequired(); - + var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.Serialize(value)); await FlushOrAddToPending( storedEffect.EffectId, @@ -110,7 +113,7 @@ await FlushOrAddToPending( delete: false ); } - + internal async Task Upserts(IEnumerable> values, bool flush) { await InitializeIfRequired(); @@ -149,11 +152,11 @@ public async Task> TryGet(EffectId effectId) return Option.NoValue; } - public async Task InnerCapture(string id, EffectType effectType, Func work, ResiliencyLevel resiliency, EffectContext effectContext) + public async Task InnerCapture(string id, Func work, ResiliencyLevel resiliency, EffectContext effectContext) { await InitializeIfRequired(); - var effectId = id.ToEffectId(effectType, context: effectContext.Parent?.Serialize().Value); + var effectId = id.ToEffectId(context: effectContext.Parent?.Serialize().Value); EffectContext.SetParent(effectId); lock (_sync) @@ -218,15 +221,15 @@ await FlushOrAddToPending( storedEffect, flush: resiliency != ResiliencyLevel.AtLeastOnceDelayFlush, delete: false - ); + ); } } - public async Task InnerCapture(string id, EffectType effectType, Func> work, ResiliencyLevel resiliency, EffectContext effectContext) + public async Task InnerCapture(string id, Func> work, ResiliencyLevel resiliency, EffectContext effectContext) { await InitializeIfRequired(); - var effectId = id.ToEffectId(effectType, context: effectContext.Parent?.Serialize().Value); + var effectId = id.ToEffectId(context: effectContext.Parent?.Serialize().Value); EffectContext.SetParent(effectId); lock (_sync) @@ -290,15 +293,15 @@ await FlushOrAddToPending( } { - var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.Serialize(result)); + var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.Serialize(result)); await FlushOrAddToPending( storedEffect.EffectId, storedEffect, flush: resiliency != ResiliencyLevel.AtLeastOnceDelayFlush, delete: false ); - - return result; + + return result; } } @@ -328,13 +331,14 @@ private void AddToPending(IEnumerable storedEffects) private void AddToPending(EffectId effectId, StoredEffect? storedEffect, bool delete) { lock (_sync) + { if (_effectResults.ContainsKey(effectId)) { var existing = _effectResults[effectId]; _effectResults[effectId] = existing with { StoredEffect = storedEffect, - Operation = delete + Operation = delete ? CrudOperation.Delete : (existing.Existing ? CrudOperation.Update : CrudOperation.Insert) }; @@ -348,6 +352,7 @@ private void AddToPending(EffectId effectId, StoredEffect? storedEffect, bool de Existing: false ); } + } } private async Task FlushOrAddToPending(EffectId effectId, StoredEffect? storedEffect, bool flush, bool delete) diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectType.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectType.cs deleted file mode 100644 index beab192c3..000000000 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectType.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Cleipnir.ResilientFunctions.Domain; - -public enum EffectType -{ - Effect = 'E', - State = 'S', - System = 'Y', - Timeout = 'T', - Retry = 'R' -} - -public static class EffectTypeExtensions -{ - public static bool IsState(this EffectType effectType) => effectType == EffectType.State; - public static bool IsSystem(this EffectType effectType) => effectType == EffectType.System; - public static bool IsEffect(this EffectType effectType) => effectType == EffectType.Effect; - public static bool IsTimeout(this EffectType effectType) => effectType == EffectType.Timeout; - public static bool IsRetry(this EffectType effectType) => effectType == EffectType.Retry; -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs b/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs index 7242c1bbc..02fbde0af 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs @@ -11,7 +11,7 @@ public class ExistingEffects(StoredId storedId, FlowId flowId, IEffectsStore eff { private Dictionary? _storedEffects; - private async Task> GetStoredEffects() + internal async Task> GetStoredEffects() { if (_storedEffects is not null) return _storedEffects; @@ -55,9 +55,11 @@ public async Task GetStatus(EffectId effectId) public async Task RemoveFailed() { - foreach (var effectId in await AllIds) - if (await GetStatus(effectId) == WorkStatus.Failed || effectId.Type == EffectType.Retry) - await Remove(effectId); + // Remove all effects - this clears failed effects and any retry-tracking state + // Since we no longer distinguish effect types, we remove everything to ensure + // a clean slate for restart + foreach (var effectId in (await AllIds).ToList()) + await Remove(effectId); } public Task Remove(string effectId) => Remove(effectId.ToEffectId()); @@ -84,17 +86,17 @@ private async Task Set(StoredEffect storedEffect) public Task SetStarted(string effectId) => SetStarted(effectId.ToEffectId()); public Task SetStarted(EffectId effectId) - => Set(new StoredEffect(effectId, WorkStatus.Started, Result: null, StoredException: null)); + => Set(StoredEffect.CreateStarted(effectId)); public Task SetSucceeded(string effectId) => SetSucceeded(effectId.ToEffectId()); public Task SetSucceeded(EffectId effectId) - => Set(new StoredEffect(effectId, WorkStatus.Completed, Result: null, StoredException: null)); + => Set(StoredEffect.CreateCompleted(effectId)); public Task SetSucceeded(string effectId, TResult result) => SetSucceeded(effectId.ToEffectId(), result); public Task SetSucceeded(EffectId effectId, TResult result) - => Set(new StoredEffect(effectId, WorkStatus.Completed, Result: serializer.Serialize(result), StoredException: null)); + => Set(StoredEffect.CreateCompleted(effectId, serializer.Serialize(result))); public Task SetFailed(string effectId, Exception exception) => SetFailed(effectId.ToEffectId(), exception); public Task SetFailed(EffectId effectId, Exception exception) - => Set(new StoredEffect(effectId, WorkStatus.Failed, Result: null, StoredException: serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, exception)))); + => Set(StoredEffect.CreateFailed(effectId, serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, exception)))); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ExistingRegisteredTimeouts.cs b/Core/Cleipnir.ResilientFunctions/Domain/ExistingRegisteredTimeouts.cs index 4dddf37dd..d58b83def 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ExistingRegisteredTimeouts.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ExistingRegisteredTimeouts.cs @@ -17,22 +17,31 @@ private async Task>> GetTime if (_timeouts is not null) return _timeouts; - var effectIds = (await effects.AllIds).ToList(); - var timeoutIds = effectIds.Where(id => id.Type == EffectType.Timeout); + var allEffects = await effects.GetStoredEffects(); var timeouts = new Dictionary>(); - foreach (var timeoutId in timeoutIds) + foreach (var kv in allEffects) { + var timeoutId = kv.Key; var value = await effects.GetValue(timeoutId); - var values = value!.Split("_"); - var status = values[0].ToInt().ToEnum(); - var expiry = values[1].ToLong().ToUtc(); + if (value == null || !value.Contains('_')) + continue; // Not a timeout effect + + var values = value.Split("_"); + if (values.Length != 2) + continue; // Not a timeout effect + + if (!int.TryParse(values[0], out var statusInt) || !long.TryParse(values[1], out var ticks)) + continue; // Not a timeout effect + + var status = statusInt.ToEnum(); + var expiry = ticks.ToUtc(); timeouts[timeoutId] = Tuple.Create(expiry, status); } return _timeouts = timeouts; } - public Task this[TimeoutId timeoutId] => this[new EffectId(timeoutId.Value, EffectType.Timeout, Context: "")]; + public Task this[TimeoutId timeoutId] => this[new EffectId(timeoutId.Value, Context: "")]; public Task this[EffectId timeoutId] => GetTimeouts().ContinueWith(t => t.Result[timeoutId].Item1); public Task> All @@ -42,22 +51,22 @@ public Task> All .ToList() .CastTo>() ); - - public Task Remove(TimeoutId timeoutId) => Remove(new EffectId(timeoutId.Value, EffectType.Timeout, Context: "")); + + public Task Remove(TimeoutId timeoutId) => Remove(new EffectId(timeoutId.Value, Context: "")); public async Task Remove(EffectId timeoutId) { var timeouts = await GetTimeouts(); await effects.Remove(timeoutId); timeouts.Remove(timeoutId); } - + public Task Upsert(TimeoutId timeoutId, DateTime expiresAt) - => Upsert(new EffectId(timeoutId.Value, EffectType.Timeout, Context: ""), expiresAt); + => Upsert(new EffectId(timeoutId.Value, Context: ""), expiresAt); public async Task Upsert(EffectId timeoutId, DateTime expiresAt) { expiresAt = expiresAt.ToUniversalTime(); var timeouts = await GetTimeouts(); - await effects.SetValue(timeoutId, $"{(int)TimeoutStatus.Registered}_{expiresAt.Ticks}"); + await effects.SetSucceeded(timeoutId, $"{(int)TimeoutStatus.Registered}_{expiresAt.Ticks}"); timeouts[timeoutId] = new Tuple(expiresAt, TimeoutStatus.Registered); } public Task Upsert(TimeoutId timeoutId, TimeSpan expiresIn) diff --git a/Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs b/Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs index 19a074e28..df2a5afc0 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs @@ -105,7 +105,7 @@ public Task Invoke(Func work, Effect effect, UtcNow utcNow, FlowMinimumTim public async Task Invoke(Func> work, Effect effect, UtcNow utcNow, FlowMinimumTimeout flowMinimumTimeout) { - var delayUntilId = effect.CreateEffectId("DelayUntil", EffectType.Retry); + var delayUntilId = effect.CreateEffectId("DelayUntil"); var delayUntilOption = await effect.TryGet(delayUntilId); var delayUntil = delayUntilOption.HasValue ? delayUntilOption.Value.ToDateTime() : DateTime.MinValue; if (delayUntilOption.HasValue && delayUntil > utcNow()) @@ -114,7 +114,7 @@ public async Task Invoke(Func> work, Effect effect, UtcNow utcNow, throw new SuspendInvocationException(); } - var iterationId = effect.CreateEffectId("Iteration", EffectType.Retry); + var iterationId = effect.CreateEffectId("Iteration"); var iteration = await effect.CreateOrGet(iterationId, 0, flush: false); if (iteration >= maximumAttempts) throw new InvalidOperationException($"Retry attempts exceeded maximum attempts value '{maximumAttempts}'"); @@ -137,13 +137,8 @@ public async Task Invoke(Func> work, Effect effect, UtcNow utcNow, delayUntil = utcNow().Add(delay); iteration += 1; { - await effect.Upserts( - [ - Tuple.Create(delayUntilId, (object) delayUntil.Ticks), - Tuple.Create(iterationId, (object) iteration) - ], - flush: false - ); + await effect.Upsert(delayUntilId, delayUntil.Ticks, flush: false); + await effect.Upsert(iterationId, iteration, flush: false); } if (iteration >= maximumAttempts) diff --git a/Core/Cleipnir.ResilientFunctions/Reactive/Extensions/InnerOperators.cs b/Core/Cleipnir.ResilientFunctions/Reactive/Extensions/InnerOperators.cs index 28949d320..7949c6378 100644 --- a/Core/Cleipnir.ResilientFunctions/Reactive/Extensions/InnerOperators.cs +++ b/Core/Cleipnir.ResilientFunctions/Reactive/Extensions/InnerOperators.cs @@ -103,9 +103,9 @@ public static IReactiveChain TakeUntil(this IReactiveChain s, Func TakeUntilTimeout(this Messages s, string timeoutEventId, TimeSpan expiresIn) - => new TimeoutOperator(s.Source, EffectId.CreateWithCurrentContext(timeoutEventId, EffectType.Timeout), expiresAt: s.UtcNow().Add(expiresIn)); + => new TimeoutOperator(s.Source, EffectId.CreateWithCurrentContext(timeoutEventId), expiresAt: s.UtcNow().Add(expiresIn)); public static IReactiveChain TakeUntilTimeout(this Messages s, string timeoutEventId, DateTime expiresAt) - => new TimeoutOperator(s.Source, EffectId.CreateWithCurrentContext(timeoutEventId, EffectType.Timeout), expiresAt); + => new TimeoutOperator(s.Source, EffectId.CreateWithCurrentContext(timeoutEventId), expiresAt); public static IReactiveChain TakeUntilTimeout(this Messages s, TimeSpan expiresIn) => s.TakeUntilTimeout(s.FlowRegisteredTimeouts.GetNextImplicitId(), expiresIn); public static IReactiveChain TakeUntilTimeout(this Messages s, DateTime expiresAt) diff --git a/Core/Cleipnir.ResilientFunctions/Storage/Types.cs b/Core/Cleipnir.ResilientFunctions/Storage/Types.cs index 13307ee0c..fb0ab0def 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/Types.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/Types.cs @@ -121,7 +121,7 @@ public static StoredEffectId Create(EffectId effectId) public static class StoredEffectIdExtensions { - public static StoredEffectId ToStoredEffectId(this string effectId, EffectType effectType) => ToStoredEffectId(effectId.ToEffectId(effectType)); + public static StoredEffectId ToStoredEffectId(this string effectId) => ToStoredEffectId(effectId.ToEffectId()); public static StoredEffectId ToStoredEffectId(this EffectId effectId) => StoredEffectId.Create(effectId); } @@ -147,23 +147,25 @@ public record StoredEffect( WorkStatus WorkStatus, byte[]? Result, StoredException? StoredException, - string? Alias = null + string? Alias = null, + SerializedEffectId? SerializedEffectId = null ) { public StoredEffectId StoredEffectId => EffectId.ToStoredEffectId(); public static StoredEffect CreateCompleted(EffectId effectId, byte[] result) - => new(effectId, WorkStatus.Completed, result, StoredException: null); + => new(effectId, WorkStatus.Completed, result, StoredException: null, Alias: null, SerializedEffectId: effectId.Serialize()); public static StoredEffect CreateCompleted(EffectId effectId) - => new(effectId, WorkStatus.Completed, Result: null, StoredException: null); + => new(effectId, WorkStatus.Completed, Result: null, StoredException: null, Alias: null, SerializedEffectId: effectId.Serialize()); public static StoredEffect CreateStarted(EffectId effectId) - => new(effectId, WorkStatus.Started, Result: null, StoredException: null); + => new(effectId, WorkStatus.Started, Result: null, StoredException: null, Alias: null, SerializedEffectId: effectId.Serialize()); public static StoredEffect CreateFailed(EffectId effectId, StoredException storedException) - => new(effectId, WorkStatus.Failed, Result: null, storedException); + => new(effectId, WorkStatus.Failed, Result: null, storedException, Alias: null, SerializedEffectId: effectId.Serialize()); public byte[] Serialize() { - var effect = EffectId.Serialize().Value.ToUtf8Bytes(); + var effectSerialized = SerializedEffectId?.Value ?? EffectId.Serialize().Value; + var effect = effectSerialized.ToUtf8Bytes(); var status = (byte)WorkStatus; var result = Result; var exception = StoredException?.Serialize(); @@ -175,7 +177,8 @@ public byte[] Serialize() public static StoredEffect Deserialize(byte[] bytes) { var parts = BinaryPacker.Split(bytes); - var effect = EffectId.Deserialize(parts[0]!.ToStringFromUtf8Bytes()); + var serializedEffectId = parts[0]!.ToStringFromUtf8Bytes(); + var effect = EffectId.Deserialize(serializedEffectId); var status = (WorkStatus)parts[1]![0]; var result = parts[2]; var exception = parts.Count > 3 @@ -183,17 +186,20 @@ public static StoredEffect Deserialize(byte[] bytes) : null; var alias = parts.Count > 3 ? parts[4]?.ToStringFromUtf8Bytes() : null; - return new StoredEffect(effect, status, result, exception, alias); + return new StoredEffect(effect, status, result, exception, alias, new SerializedEffectId(serializedEffectId)); } public static StoredEffect CreateState(StoredState storedState) { - var effectId = storedState.StateId.Value.ToEffectId(effectType: EffectType.State); + var effectId = storedState.StateId.Value.ToEffectId(); + var serializedEffectId = effectId.Serialize(); return new StoredEffect( effectId, WorkStatus.Completed, storedState.StateJson, - StoredException: null + StoredException: null, + Alias: null, + SerializedEffectId: serializedEffectId ); } };