From 6c6eb4377d0515510aaa47999110bf0334b38379 Mon Sep 17 00:00:00 2001 From: Karel Frajtak <1510452+kfrajtak@users.noreply.github.com> Date: Mon, 16 Jun 2025 19:59:23 +0200 Subject: [PATCH 1/4] support for executing a sub-workflow added + test --- .../WorkflowCore.Testing.csproj | 1 + .../Interface/IWorkflowModifier.cs | 10 ++ src/WorkflowCore/Models/ExecutionPointer.cs | 4 + .../Primitives/SubWorkflowStepBody.cs | 46 ++++++- src/WorkflowCore/Primitives/WaitFor.cs | 3 +- .../ServiceCollectionExtensions.cs | 1 + .../Services/ExecutionResultProcessor.cs | 10 ++ .../Services/FluentBuilders/StepBuilder.cs | 19 +++ .../FluentBuilders/WorkflowBuilder.cs | 6 + src/WorkflowCore/Services/WorkflowExecutor.cs | 14 ++- src/WorkflowCore/Services/WorkflowHost.cs | 6 + .../Scenarios/ApprovalScenario.cs | 118 ++++++++++++++++++ .../Scenarios/SqlServerApprovalScenario.cs | 16 +++ 13 files changed, 249 insertions(+), 5 deletions(-) create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs create mode 100644 test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs diff --git a/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj b/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj index 6102c9892..58ba7d4a9 100644 --- a/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj +++ b/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj @@ -12,6 +12,7 @@ + diff --git a/src/WorkflowCore/Interface/IWorkflowModifier.cs b/src/WorkflowCore/Interface/IWorkflowModifier.cs index 835855e97..5597b4f43 100644 --- a/src/WorkflowCore/Interface/IWorkflowModifier.cs +++ b/src/WorkflowCore/Interface/IWorkflowModifier.cs @@ -183,5 +183,15 @@ IStepBuilder Activity(string activityName, Expression IStepBuilder Activity(Expression> activityName, Expression> parameters = null, Expression> effectiveDate = null, Expression> cancelCondition = null); + + /// + /// Execute a sub-workflow + /// + /// Id of the sub-workflow to start + /// The data to pass to the sub-workflow + /// A condition that when true will cancel this sub-workflow + /// + IStepBuilder SubWorkflow(string subWorkflowId, Expression> parameters = null, + Expression> cancelCondition = null); } } \ No newline at end of file diff --git a/src/WorkflowCore/Models/ExecutionPointer.cs b/src/WorkflowCore/Models/ExecutionPointer.cs index 6c12afda7..01fe516d0 100644 --- a/src/WorkflowCore/Models/ExecutionPointer.cs +++ b/src/WorkflowCore/Models/ExecutionPointer.cs @@ -51,6 +51,10 @@ public IReadOnlyCollection Scope get => _scope; set => _scope = new List(value); } + + public bool IsComplete => Status == PointerStatus.Complete; + + public bool HasChildren => Children?.Count > 0; } public enum PointerStatus diff --git a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs index 7d6cd4c9e..6ab3c06b4 100644 --- a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs +++ b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs @@ -1,15 +1,57 @@ using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using WorkflowCore.Interface; using WorkflowCore.Models; +using WorkflowCore.Models.LifeCycleEvents; namespace WorkflowCore.Primitives { public class SubWorkflowStepBody : StepBody { + private readonly IScopeProvider _scopeProvider; + + public SubWorkflowStepBody(IScopeProvider scopeProvider) + { + _scopeProvider = scopeProvider; + } + public override ExecutionResult Run(IStepExecutionContext context) { - // TODO: What is this supposed to do? - throw new NotImplementedException(); + var eventKey = context.ExecutionPointer.EventKey; + + var scope = _scopeProvider.CreateScope(context); + var workflowController = scope.ServiceProvider.GetRequiredService(); + var logger = scope.ServiceProvider.GetRequiredService().CreateLogger( + typeof(SubWorkflowStepBody).Namespace + "." + nameof(SubWorkflowStepBody)); + + if (!context.ExecutionPointer.EventPublished) + { + var result = workflowController.StartWorkflow(SubWorkflowId, context.Workflow.Data, context.Workflow.Id).Result; + + logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})", + SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id); + + logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event WorkflowCompleted with key='{EventKey}'", + SubWorkflowId, result, result); + + var effectiveDate = DateTime.MinValue; + return ExecutionResult.WaitForEvent(nameof(WorkflowCompleted), result, effectiveDate); + } + + logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId, + context.ExecutionPointer.EventKey); + + var persistenceProvider = scope.ServiceProvider.GetRequiredService(); + + Result = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result.Data; + return ExecutionResult.Next(); } + + public string SubWorkflowId { get; set; } + + public object Parameters { get; set; } + + public object Result { get; set; } } } diff --git a/src/WorkflowCore/Primitives/WaitFor.cs b/src/WorkflowCore/Primitives/WaitFor.cs index 7f84be2cd..35065ff56 100644 --- a/src/WorkflowCore/Primitives/WaitFor.cs +++ b/src/WorkflowCore/Primitives/WaitFor.cs @@ -25,7 +25,8 @@ public override ExecutionResult Run(IStepExecutionContext context) effectiveDate = EffectiveDate; } - return ExecutionResult.WaitForEvent(EventName, EventKey, effectiveDate); + var eventKey = context.Workflow.Reference ?? EventKey; + return ExecutionResult.WaitForEvent(EventName, eventKey, effectiveDate); } EventData = context.ExecutionPointer.EventData; diff --git a/src/WorkflowCore/ServiceCollectionExtensions.cs b/src/WorkflowCore/ServiceCollectionExtensions.cs index f38f44a5b..18d710e1f 100644 --- a/src/WorkflowCore/ServiceCollectionExtensions.cs +++ b/src/WorkflowCore/ServiceCollectionExtensions.cs @@ -81,6 +81,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A services.AddTransient(); services.AddTransient(); + services.AddTransient(); return services; } diff --git a/src/WorkflowCore/Services/ExecutionResultProcessor.cs b/src/WorkflowCore/Services/ExecutionResultProcessor.cs index 3c684945f..c44f8e74d 100755 --- a/src/WorkflowCore/Services/ExecutionResultProcessor.cs +++ b/src/WorkflowCore/Services/ExecutionResultProcessor.cs @@ -29,12 +29,16 @@ public ExecutionResultProcessor(IExecutionPointerFactory pointerFactory, IDateTi public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult) { + var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})"; + pointer.PersistenceData = result.PersistenceData; pointer.Outcome = result.OutcomeValue; if (result.SleepFor.HasValue) { pointer.SleepUntil = _datetimeProvider.UtcNow.Add(result.SleepFor.Value); pointer.Status = PointerStatus.Sleeping; + _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) will sleep for {SleepUntil}", + stepInfo, workflow.WorkflowDefinitionId, workflow.Id, result.SleepFor.Value); } if (!string.IsNullOrEmpty(result.EventName)) @@ -54,6 +58,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition SubscribeAsOf = result.EventAsOf, SubscriptionData = result.SubscriptionData }); + + _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) waiting for event {EventName}", + stepInfo, workflow.WorkflowDefinitionId, workflow.Id, pointer.EventName); } if (result.Proceed) @@ -87,6 +94,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition WorkflowDefinitionId = workflow.WorkflowDefinitionId, Version = workflow.Version }); + + _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) completed", + stepInfo, workflow.WorkflowDefinitionId, workflow.Id); } else { diff --git a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs index 9e41f15ca..9a2e12e50 100644 --- a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs @@ -588,5 +588,24 @@ public IStepBuilder Activity(Expression SubWorkflow( + string subWorkflowId, + Expression> parameters = null, + Expression> cancelCondition = null) + { + var newStep = new WorkflowStep(); + newStep.CancelCondition = cancelCondition; + + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new StepBuilder(WorkflowBuilder, newStep); + stepBuilder.Input((step) => step.SubWorkflowId, (data) => subWorkflowId); + + if (parameters != null) + stepBuilder.Input((step) => step.Parameters, parameters); + + Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id }); + return stepBuilder; + } } } diff --git a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs index adc02f488..145e4205d 100644 --- a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs @@ -296,10 +296,16 @@ public IStepBuilder Activity(string activityName, Expression Activity(Expression> activityName, Expression> parameters = null, Expression> effectiveDate = null, Expression> cancelCondition = null) { return Start().Activity(activityName, parameters, effectiveDate, cancelCondition); } + + public IStepBuilder SubWorkflow(string subWorkflowId, Expression> parameters = null, Expression> cancelCondition = null) + { + return Start().SubWorkflow(subWorkflowId, parameters, cancelCondition); + } private IStepBuilder Start() { diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index da3e9cd85..7d8e1000a 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -156,9 +156,11 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe CancellationToken = cancellationToken }; + var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})"; + using (var scope = _scopeProvider.CreateScope(context)) { - _logger.LogDebug("Starting step {StepName} on workflow {WorkflowId}", step.Name, workflow.Id); + _logger.LogDebug("Starting step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId})", stepInfo, workflow.WorkflowDefinitionId, workflow.Id); IStepBody body = step.ConstructBody(scope.ServiceProvider); var stepExecutor = scope.ServiceProvider.GetRequiredService(); @@ -221,6 +223,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo if (workflow.Status == WorkflowStatus.Complete) { + await OnComplete(workflow, def); return; } @@ -236,7 +239,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo workflow.NextExecution = Math.Min(pointerSleep, workflow.NextExecution ?? pointerSleep); } - foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List()).Count > 0)) + foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && x.HasChildren)) { if (!workflow.ExecutionPointers.FindByScope(pointer.Id).All(x => x.EndTime.HasValue)) continue; @@ -256,6 +259,11 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo return; } + await OnComplete(workflow, def); + } + + private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def) + { workflow.Status = WorkflowStatus.Complete; workflow.CompleteTime = _datetimeProvider.UtcNow; @@ -264,6 +272,8 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo var middlewareRunner = scope.ServiceProvider.GetRequiredService(); await middlewareRunner.RunPostMiddleware(workflow, def); } + + _logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) completed", workflow.WorkflowDefinitionId, workflow.Id); _publisher.PublishNotification(new WorkflowCompleted { diff --git a/src/WorkflowCore/Services/WorkflowHost.cs b/src/WorkflowCore/Services/WorkflowHost.cs index 73c8850fa..0fe74b770 100644 --- a/src/WorkflowCore/Services/WorkflowHost.cs +++ b/src/WorkflowCore/Services/WorkflowHost.cs @@ -165,6 +165,12 @@ public Task TerminateWorkflow(string workflowId) public void HandleLifeCycleEvent(LifeCycleEvent evt) { + if (evt is WorkflowCompleted completed) + { + _workflowController.PublishEvent(nameof(WorkflowCompleted), completed.WorkflowInstanceId, + completed.Reference); + } + OnLifeCycleEvent?.Invoke(evt); } diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs new file mode 100644 index 000000000..13f8ee89d --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs @@ -0,0 +1,118 @@ +using System; +using FluentAssertions; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Testing; +using Xunit; +using Xunit.Abstractions; + +namespace WorkflowCore.IntegrationTests.Scenarios; + +public class ApprovalScenario : WorkflowTest +{ + public class ApprovalInput + { + public string Id { get; set; } + public bool Approved { get; set; } + public TimeSpan TimeSpan { get; set; } + public string Message { get; set; } + } + + public class ParentWorkflow : IWorkflow + { + public string Id => nameof(ParentWorkflow); + + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => ExecutionResult.Next()) + .SubWorkflow(nameof(ChildWorkflow)) + .Output(i => i.Approved, step => ((ApprovalInput)step.Result).Approved) + /* + * this does throw an exception + .If(data => data.Approved) + .Do(then => + ExecutionResult.Outcome(1248))*/; + } + } + + public class ChildWorkflow : IWorkflow + { + public string Id => nameof(ChildWorkflow); + + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => ExecutionResult.Next()) + .Parallel() + .Do(then + => then + .Delay(i => i.TimeSpan) + .Output(i => i.Approved, step => false) + .EndWorkflow() + ) + .Do(then + => then + .WaitFor("Approved", e => e.Id) + .Output(i => i.Approved, step => step.EventData) + .EndWorkflow() + ) + .Join(); + } + } + + public ApprovalScenario() + { + Setup(); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void Scenario(bool approved) + { + Host.Registry.RegisterWorkflow(new ChildWorkflow()); + + var eventKey = Guid.NewGuid().ToString(); + var workflowId = StartWorkflow(new ApprovalInput + { + Id = eventKey, + TimeSpan = TimeSpan.FromMinutes(10) + }); + + WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5)); + UnhandledStepErrors.Should().BeEmpty(); + + Host.PublishEvent("Approved", workflowId, approved); + + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(10)); + + System.Threading.Thread.Sleep(2000); + + UnhandledStepErrors.Should().BeEmpty(); + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + GetData(workflowId).Approved.Should().Be(approved); + } + + [Fact] + public void Timeout() + { + Host.Registry.RegisterWorkflow(new ChildWorkflow()); + + var workflowId = StartWorkflow(new ApprovalInput + { + Id = Guid.NewGuid().ToString(), + TimeSpan = TimeSpan.FromSeconds(5) + }); + WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(2)); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(10)); + + UnhandledStepErrors.Should().BeEmpty(); + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + GetData(workflowId).Approved.Should().BeFalse(); + } +} diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs new file mode 100644 index 000000000..b70314652 --- /dev/null +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs @@ -0,0 +1,16 @@ +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; +using Xunit.Abstractions; + +namespace WorkflowCore.Tests.SqlServer.Scenarios +{ + [Collection("SqlServer collection")] + public class SqlServerApprovalScenario() : ApprovalScenario() + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} From de83dc55a66b1d7f384cf457ea0b2d3abf03997a Mon Sep 17 00:00:00 2001 From: Karel Frajtak <1510452+kfrajtak@users.noreply.github.com> Date: Sat, 28 Jun 2025 16:03:03 +0200 Subject: [PATCH 2/4] sub workflow result propagation as output to parent --- .../Services/FluentBuilders/StepBuilder.cs | 10 +++++++++ .../Scenarios/ApprovalScenario.cs | 22 ++++++++++++++++--- .../Scenarios/SqliteApprovalScenario.cs | 21 ++++++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs diff --git a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs index 9a2e12e50..2673cdbd8 100644 --- a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs @@ -603,6 +603,16 @@ public IStepBuilder SubWorkflow( if (parameters != null) stepBuilder.Input((step) => step.Parameters, parameters); + + // use the result of the sub workflow as an output + // merge it with parent workflow data + stepBuilder.Output((body, data) => + { + foreach (var prop in typeof(TData).GetProperties()) + { + prop.SetValue(data, prop.GetValue(body.Result)); + } + }); Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id }); return stepBuilder; diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs index 13f8ee89d..dced900c5 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs @@ -1,5 +1,6 @@ using System; using FluentAssertions; +using Newtonsoft.Json.Linq; using WorkflowCore.Interface; using WorkflowCore.Models; using WorkflowCore.Testing; @@ -58,7 +59,12 @@ public void Build(IWorkflowBuilder builder) .Do(then => then .WaitFor("Approved", e => e.Id) - .Output(i => i.Approved, step => step.EventData) + .Output((w, input) => + { + var j = JObject.FromObject(w.EventData); + input.Approved = j["Approved"].Value(); + input.Message= j["Message"].Value(); + }) .EndWorkflow() ) .Join(); @@ -87,7 +93,11 @@ public void Scenario(bool approved) WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5)); UnhandledStepErrors.Should().BeEmpty(); - Host.PublishEvent("Approved", workflowId, approved); + Host.PublishEvent("Approved", workflowId, new + { + Approved = approved, + Message = "message " + approved + }); WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(10)); @@ -95,7 +105,13 @@ public void Scenario(bool approved) UnhandledStepErrors.Should().BeEmpty(); GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); - GetData(workflowId).Approved.Should().Be(approved); + GetData(workflowId).ShouldBeEquivalentTo(new ApprovalInput + { + Id = eventKey, + Approved = approved, + Message = "message " + approved, + TimeSpan = TimeSpan.FromMinutes(10) + }); } [Fact] diff --git a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs new file mode 100644 index 000000000..c910c201c --- /dev/null +++ b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs @@ -0,0 +1,21 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; +using Xunit.Abstractions; + +namespace WorkflowCore.Tests.Sqlite.Scenarios +{ + [Collection("Sqlite collection")] + public class SqliteApprovalScenario : ApprovalScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(cfg => + { + cfg.UseSqlite($"Data Source=wfc-tests-{DateTime.Now.Ticks}.db;", true); + cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + }); + } + } +} \ No newline at end of file From b41bc0dbf3e0419ddf01cc6eb4f9aaef4b85cb54 Mon Sep 17 00:00:00 2001 From: Karel Frajtak <1510452+kfrajtak@users.noreply.github.com> Date: Sun, 29 Jun 2025 13:28:13 +0200 Subject: [PATCH 3/4] failure in sub workflow will terminate the parent workflow --- .../SubWorkflowLifeCycleEvent.cs | 7 ++++ .../Primitives/SubWorkflowStepBody.cs | 15 ++++---- src/WorkflowCore/Services/WorkflowExecutor.cs | 29 ++++++++++++++++ src/WorkflowCore/Services/WorkflowHost.cs | 9 +++-- .../EntityFrameworkPersistenceProvider.cs | 4 +++ ...ovalScenario.cs => SubWorkflowScenario.cs} | 34 +++++++++++++++++-- ...rio.cs => SqlServerSubWorkflowScenario.cs} | 2 +- ...enario.cs => SqliteSubWorkflowScenario.cs} | 2 +- 8 files changed, 89 insertions(+), 13 deletions(-) create mode 100644 src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs rename test/WorkflowCore.IntegrationTests/Scenarios/{ApprovalScenario.cs => SubWorkflowScenario.cs} (78%) rename test/WorkflowCore.Tests.SqlServer/Scenarios/{SqlServerApprovalScenario.cs => SqlServerSubWorkflowScenario.cs} (86%) rename test/WorkflowCore.Tests.Sqlite/Scenarios/{SqliteApprovalScenario.cs => SqliteSubWorkflowScenario.cs} (89%) diff --git a/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs b/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs new file mode 100644 index 000000000..cbb7cc4e5 --- /dev/null +++ b/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs @@ -0,0 +1,7 @@ +namespace WorkflowCore.Models.LifeCycleEvents +{ + public class SubWorkflowLifeCycleEvent : LifeCycleEvent + { + + } +} \ No newline at end of file diff --git a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs index 6ab3c06b4..b2ac30a30 100644 --- a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs +++ b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs @@ -18,8 +18,6 @@ public SubWorkflowStepBody(IScopeProvider scopeProvider) public override ExecutionResult Run(IStepExecutionContext context) { - var eventKey = context.ExecutionPointer.EventKey; - var scope = _scopeProvider.CreateScope(context); var workflowController = scope.ServiceProvider.GetRequiredService(); var logger = scope.ServiceProvider.GetRequiredService().CreateLogger( @@ -32,19 +30,24 @@ public override ExecutionResult Run(IStepExecutionContext context) logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})", SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id); - logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event WorkflowCompleted with key='{EventKey}'", + logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event SubWorkflowLifeCycleEvent with key='{EventKey}'", SubWorkflowId, result, result); var effectiveDate = DateTime.MinValue; - return ExecutionResult.WaitForEvent(nameof(WorkflowCompleted), result, effectiveDate); + return ExecutionResult.WaitForEvent(nameof(SubWorkflowLifeCycleEvent), result, effectiveDate); } logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId, context.ExecutionPointer.EventKey); var persistenceProvider = scope.ServiceProvider.GetRequiredService(); - - Result = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result.Data; + var workflowInstance = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result; + if (workflowInstance.Status == WorkflowStatus.Terminated) + { + throw new NotImplementedException(workflowInstance.Status.ToString()); + } + + Result = workflowInstance.Data; return ExecutionResult.Next(); } diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index 7d8e1000a..0cdff408a 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -227,6 +227,12 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo return; } + if (workflow.Status == WorkflowStatus.Terminated) + { + await OnTerminated(workflow, def); + return; + } + foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List()).Count == 0)) { if (!pointer.SleepUntil.HasValue) @@ -284,5 +290,28 @@ private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def) Version = workflow.Version }); } + + private async Task OnTerminated(WorkflowInstance workflow, WorkflowDefinition def) + { + workflow.Status = WorkflowStatus.Terminated; + workflow.CompleteTime = _datetimeProvider.UtcNow; + + using (var scope = _serviceProvider.CreateScope()) + { + var middlewareRunner = scope.ServiceProvider.GetRequiredService(); + await middlewareRunner.RunPostMiddleware(workflow, def); + } + + _logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) terminated", workflow.WorkflowDefinitionId, workflow.Id); + + _publisher.PublishNotification(new WorkflowTerminated + { + EventTimeUtc = _datetimeProvider.UtcNow, + Reference = workflow.Reference, + WorkflowInstanceId = workflow.Id, + WorkflowDefinitionId = workflow.WorkflowDefinitionId, + Version = workflow.Version + }); + } } } \ No newline at end of file diff --git a/src/WorkflowCore/Services/WorkflowHost.cs b/src/WorkflowCore/Services/WorkflowHost.cs index 0fe74b770..6fc8caf69 100644 --- a/src/WorkflowCore/Services/WorkflowHost.cs +++ b/src/WorkflowCore/Services/WorkflowHost.cs @@ -165,10 +165,13 @@ public Task TerminateWorkflow(string workflowId) public void HandleLifeCycleEvent(LifeCycleEvent evt) { - if (evt is WorkflowCompleted completed) + switch (evt) { - _workflowController.PublishEvent(nameof(WorkflowCompleted), completed.WorkflowInstanceId, - completed.Reference); + // publish the event as sub workflow lifecycle event + case WorkflowCompleted _: + case WorkflowTerminated _: + _workflowController.PublishEvent(nameof(SubWorkflowLifeCycleEvent), evt.WorkflowInstanceId, evt.Reference); + break; } OnLifeCycleEvent?.Invoke(evt); diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index 22ba680f5..b06f275b4 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -100,6 +100,10 @@ public async Task GetWorkflowInstance(string Id, CancellationT { using (var db = ConstructDbContext()) { + if (!Guid.TryParse(Id, out _)) + { + + } var uid = new Guid(Id); var raw = await db.Set() .Include(wf => wf.ExecutionPointers) diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs similarity index 78% rename from test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs rename to test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs index dced900c5..dc87c60bd 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs @@ -9,7 +9,7 @@ namespace WorkflowCore.IntegrationTests.Scenarios; -public class ApprovalScenario : WorkflowTest +public class SubWorkflowScenario : WorkflowTest { public class ApprovalInput { @@ -28,6 +28,7 @@ public class ParentWorkflow : IWorkflow public void Build(IWorkflowBuilder builder) { builder + .UseDefaultErrorBehavior(WorkflowErrorHandling.Terminate) .StartWith(context => ExecutionResult.Next()) .SubWorkflow(nameof(ChildWorkflow)) .Output(i => i.Approved, step => ((ApprovalInput)step.Result).Approved) @@ -48,6 +49,7 @@ public class ChildWorkflow : IWorkflow public void Build(IWorkflowBuilder builder) { builder + .UseDefaultErrorBehavior(WorkflowErrorHandling.Terminate) .StartWith(context => ExecutionResult.Next()) .Parallel() .Do(then @@ -71,7 +73,7 @@ public void Build(IWorkflowBuilder builder) } } - public ApprovalScenario() + public SubWorkflowScenario() { Setup(); } @@ -114,6 +116,34 @@ public void Scenario(bool approved) }); } + [Fact] + public void Failure() + { + Host.Registry.RegisterWorkflow(new ChildWorkflow()); + + var eventKey = Guid.NewGuid().ToString(); + var workflowId = StartWorkflow(new ApprovalInput + { + Id = eventKey, + TimeSpan = TimeSpan.FromMinutes(10) + }); + + WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5)); + UnhandledStepErrors.Should().BeEmpty(); + + Host.PublishEvent("Approved", workflowId, new + { + Approved = "string" + }); + + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(20)); + + System.Threading.Thread.Sleep(2000); + + UnhandledStepErrors.Should().NotBeEmpty(); + GetStatus(workflowId).Should().Be(WorkflowStatus.Terminated); + } + [Fact] public void Timeout() { diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs similarity index 86% rename from test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs rename to test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs index b70314652..6fc07c5a6 100644 --- a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs @@ -6,7 +6,7 @@ namespace WorkflowCore.Tests.SqlServer.Scenarios { [Collection("SqlServer collection")] - public class SqlServerApprovalScenario() : ApprovalScenario() + public class SqlServerSubWorkflowScenario() : SubWorkflowScenario() { protected override void ConfigureServices(IServiceCollection services) { diff --git a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs similarity index 89% rename from test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs rename to test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs index c910c201c..d1e2ae06a 100644 --- a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs +++ b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs @@ -7,7 +7,7 @@ namespace WorkflowCore.Tests.Sqlite.Scenarios { [Collection("Sqlite collection")] - public class SqliteApprovalScenario : ApprovalScenario + public class SqliteSubWorkflowScenario : SubWorkflowScenario { protected override void ConfigureServices(IServiceCollection services) { From de2a6d5deadc0b4c46f1d2d14497df72f6031615 Mon Sep 17 00:00:00 2001 From: Karel Frajtak <1510452+kfrajtak@users.noreply.github.com> Date: Mon, 30 Jun 2025 09:50:18 +0200 Subject: [PATCH 4/4] ExecutionError.Message will contain full exception information --- src/WorkflowCore/Services/WorkflowExecutor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index 0cdff408a..205e30945 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -89,7 +89,7 @@ public async Task Execute(WorkflowInstance workflow, Can WorkflowId = workflow.Id, ExecutionPointerId = pointer.Id, ErrorTime = _datetimeProvider.UtcNow, - Message = ex.Message + Message = ex.ToString() }); _executionResultProcessor.HandleStepException(workflow, def, pointer, step, ex);