From 6c6eb4377d0515510aaa47999110bf0334b38379 Mon Sep 17 00:00:00 2001
From: Karel Frajtak <1510452+kfrajtak@users.noreply.github.com>
Date: Mon, 16 Jun 2025 19:59:23 +0200
Subject: [PATCH 1/4] support for executing a sub-workflow added + test
---
.../WorkflowCore.Testing.csproj | 1 +
.../Interface/IWorkflowModifier.cs | 10 ++
src/WorkflowCore/Models/ExecutionPointer.cs | 4 +
.../Primitives/SubWorkflowStepBody.cs | 46 ++++++-
src/WorkflowCore/Primitives/WaitFor.cs | 3 +-
.../ServiceCollectionExtensions.cs | 1 +
.../Services/ExecutionResultProcessor.cs | 10 ++
.../Services/FluentBuilders/StepBuilder.cs | 19 +++
.../FluentBuilders/WorkflowBuilder.cs | 6 +
src/WorkflowCore/Services/WorkflowExecutor.cs | 14 ++-
src/WorkflowCore/Services/WorkflowHost.cs | 6 +
.../Scenarios/ApprovalScenario.cs | 118 ++++++++++++++++++
.../Scenarios/SqlServerApprovalScenario.cs | 16 +++
13 files changed, 249 insertions(+), 5 deletions(-)
create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs
create mode 100644 test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs
diff --git a/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj b/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj
index 6102c9892..58ba7d4a9 100644
--- a/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj
+++ b/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj
@@ -12,6 +12,7 @@
+
diff --git a/src/WorkflowCore/Interface/IWorkflowModifier.cs b/src/WorkflowCore/Interface/IWorkflowModifier.cs
index 835855e97..5597b4f43 100644
--- a/src/WorkflowCore/Interface/IWorkflowModifier.cs
+++ b/src/WorkflowCore/Interface/IWorkflowModifier.cs
@@ -183,5 +183,15 @@ IStepBuilder Activity(string activityName, Expression
IStepBuilder Activity(Expression> activityName, Expression> parameters = null,
Expression> effectiveDate = null, Expression> cancelCondition = null);
+
+ ///
+ /// Execute a sub-workflow
+ ///
+ /// Id of the sub-workflow to start
+ /// The data to pass to the sub-workflow
+ /// A condition that when true will cancel this sub-workflow
+ ///
+ IStepBuilder SubWorkflow(string subWorkflowId, Expression> parameters = null,
+ Expression> cancelCondition = null);
}
}
\ No newline at end of file
diff --git a/src/WorkflowCore/Models/ExecutionPointer.cs b/src/WorkflowCore/Models/ExecutionPointer.cs
index 6c12afda7..01fe516d0 100644
--- a/src/WorkflowCore/Models/ExecutionPointer.cs
+++ b/src/WorkflowCore/Models/ExecutionPointer.cs
@@ -51,6 +51,10 @@ public IReadOnlyCollection Scope
get => _scope;
set => _scope = new List(value);
}
+
+ public bool IsComplete => Status == PointerStatus.Complete;
+
+ public bool HasChildren => Children?.Count > 0;
}
public enum PointerStatus
diff --git a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs
index 7d6cd4c9e..6ab3c06b4 100644
--- a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs
+++ b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs
@@ -1,15 +1,57 @@
using System;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using WorkflowCore.Models;
+using WorkflowCore.Models.LifeCycleEvents;
namespace WorkflowCore.Primitives
{
public class SubWorkflowStepBody : StepBody
{
+ private readonly IScopeProvider _scopeProvider;
+
+ public SubWorkflowStepBody(IScopeProvider scopeProvider)
+ {
+ _scopeProvider = scopeProvider;
+ }
+
public override ExecutionResult Run(IStepExecutionContext context)
{
- // TODO: What is this supposed to do?
- throw new NotImplementedException();
+ var eventKey = context.ExecutionPointer.EventKey;
+
+ var scope = _scopeProvider.CreateScope(context);
+ var workflowController = scope.ServiceProvider.GetRequiredService();
+ var logger = scope.ServiceProvider.GetRequiredService().CreateLogger(
+ typeof(SubWorkflowStepBody).Namespace + "." + nameof(SubWorkflowStepBody));
+
+ if (!context.ExecutionPointer.EventPublished)
+ {
+ var result = workflowController.StartWorkflow(SubWorkflowId, context.Workflow.Data, context.Workflow.Id).Result;
+
+ logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})",
+ SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id);
+
+ logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event WorkflowCompleted with key='{EventKey}'",
+ SubWorkflowId, result, result);
+
+ var effectiveDate = DateTime.MinValue;
+ return ExecutionResult.WaitForEvent(nameof(WorkflowCompleted), result, effectiveDate);
+ }
+
+ logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId,
+ context.ExecutionPointer.EventKey);
+
+ var persistenceProvider = scope.ServiceProvider.GetRequiredService();
+
+ Result = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result.Data;
+ return ExecutionResult.Next();
}
+
+ public string SubWorkflowId { get; set; }
+
+ public object Parameters { get; set; }
+
+ public object Result { get; set; }
}
}
diff --git a/src/WorkflowCore/Primitives/WaitFor.cs b/src/WorkflowCore/Primitives/WaitFor.cs
index 7f84be2cd..35065ff56 100644
--- a/src/WorkflowCore/Primitives/WaitFor.cs
+++ b/src/WorkflowCore/Primitives/WaitFor.cs
@@ -25,7 +25,8 @@ public override ExecutionResult Run(IStepExecutionContext context)
effectiveDate = EffectiveDate;
}
- return ExecutionResult.WaitForEvent(EventName, EventKey, effectiveDate);
+ var eventKey = context.Workflow.Reference ?? EventKey;
+ return ExecutionResult.WaitForEvent(EventName, eventKey, effectiveDate);
}
EventData = context.ExecutionPointer.EventData;
diff --git a/src/WorkflowCore/ServiceCollectionExtensions.cs b/src/WorkflowCore/ServiceCollectionExtensions.cs
index f38f44a5b..18d710e1f 100644
--- a/src/WorkflowCore/ServiceCollectionExtensions.cs
+++ b/src/WorkflowCore/ServiceCollectionExtensions.cs
@@ -81,6 +81,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
services.AddTransient();
services.AddTransient();
+ services.AddTransient();
return services;
}
diff --git a/src/WorkflowCore/Services/ExecutionResultProcessor.cs b/src/WorkflowCore/Services/ExecutionResultProcessor.cs
index 3c684945f..c44f8e74d 100755
--- a/src/WorkflowCore/Services/ExecutionResultProcessor.cs
+++ b/src/WorkflowCore/Services/ExecutionResultProcessor.cs
@@ -29,12 +29,16 @@ public ExecutionResultProcessor(IExecutionPointerFactory pointerFactory, IDateTi
public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult)
{
+ var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})";
+
pointer.PersistenceData = result.PersistenceData;
pointer.Outcome = result.OutcomeValue;
if (result.SleepFor.HasValue)
{
pointer.SleepUntil = _datetimeProvider.UtcNow.Add(result.SleepFor.Value);
pointer.Status = PointerStatus.Sleeping;
+ _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) will sleep for {SleepUntil}",
+ stepInfo, workflow.WorkflowDefinitionId, workflow.Id, result.SleepFor.Value);
}
if (!string.IsNullOrEmpty(result.EventName))
@@ -54,6 +58,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition
SubscribeAsOf = result.EventAsOf,
SubscriptionData = result.SubscriptionData
});
+
+ _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) waiting for event {EventName}",
+ stepInfo, workflow.WorkflowDefinitionId, workflow.Id, pointer.EventName);
}
if (result.Proceed)
@@ -87,6 +94,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
Version = workflow.Version
});
+
+ _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) completed",
+ stepInfo, workflow.WorkflowDefinitionId, workflow.Id);
}
else
{
diff --git a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
index 9e41f15ca..9a2e12e50 100644
--- a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
+++ b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
@@ -588,5 +588,24 @@ public IStepBuilder Activity(Expression SubWorkflow(
+ string subWorkflowId,
+ Expression> parameters = null,
+ Expression> cancelCondition = null)
+ {
+ var newStep = new WorkflowStep();
+ newStep.CancelCondition = cancelCondition;
+
+ WorkflowBuilder.AddStep(newStep);
+ var stepBuilder = new StepBuilder(WorkflowBuilder, newStep);
+ stepBuilder.Input((step) => step.SubWorkflowId, (data) => subWorkflowId);
+
+ if (parameters != null)
+ stepBuilder.Input((step) => step.Parameters, parameters);
+
+ Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
+ return stepBuilder;
+ }
}
}
diff --git a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs
index adc02f488..145e4205d 100644
--- a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs
+++ b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs
@@ -296,10 +296,16 @@ public IStepBuilder Activity(string activityName, Expression Activity(Expression> activityName, Expression> parameters = null, Expression> effectiveDate = null, Expression> cancelCondition = null)
{
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
}
+
+ public IStepBuilder SubWorkflow(string subWorkflowId, Expression> parameters = null, Expression> cancelCondition = null)
+ {
+ return Start().SubWorkflow(subWorkflowId, parameters, cancelCondition);
+ }
private IStepBuilder Start()
{
diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs
index da3e9cd85..7d8e1000a 100755
--- a/src/WorkflowCore/Services/WorkflowExecutor.cs
+++ b/src/WorkflowCore/Services/WorkflowExecutor.cs
@@ -156,9 +156,11 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe
CancellationToken = cancellationToken
};
+ var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})";
+
using (var scope = _scopeProvider.CreateScope(context))
{
- _logger.LogDebug("Starting step {StepName} on workflow {WorkflowId}", step.Name, workflow.Id);
+ _logger.LogDebug("Starting step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId})", stepInfo, workflow.WorkflowDefinitionId, workflow.Id);
IStepBody body = step.ConstructBody(scope.ServiceProvider);
var stepExecutor = scope.ServiceProvider.GetRequiredService();
@@ -221,6 +223,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
if (workflow.Status == WorkflowStatus.Complete)
{
+ await OnComplete(workflow, def);
return;
}
@@ -236,7 +239,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
workflow.NextExecution = Math.Min(pointerSleep, workflow.NextExecution ?? pointerSleep);
}
- foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List()).Count > 0))
+ foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && x.HasChildren))
{
if (!workflow.ExecutionPointers.FindByScope(pointer.Id).All(x => x.EndTime.HasValue))
continue;
@@ -256,6 +259,11 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
return;
}
+ await OnComplete(workflow, def);
+ }
+
+ private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def)
+ {
workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.UtcNow;
@@ -264,6 +272,8 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
var middlewareRunner = scope.ServiceProvider.GetRequiredService();
await middlewareRunner.RunPostMiddleware(workflow, def);
}
+
+ _logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) completed", workflow.WorkflowDefinitionId, workflow.Id);
_publisher.PublishNotification(new WorkflowCompleted
{
diff --git a/src/WorkflowCore/Services/WorkflowHost.cs b/src/WorkflowCore/Services/WorkflowHost.cs
index 73c8850fa..0fe74b770 100644
--- a/src/WorkflowCore/Services/WorkflowHost.cs
+++ b/src/WorkflowCore/Services/WorkflowHost.cs
@@ -165,6 +165,12 @@ public Task TerminateWorkflow(string workflowId)
public void HandleLifeCycleEvent(LifeCycleEvent evt)
{
+ if (evt is WorkflowCompleted completed)
+ {
+ _workflowController.PublishEvent(nameof(WorkflowCompleted), completed.WorkflowInstanceId,
+ completed.Reference);
+ }
+
OnLifeCycleEvent?.Invoke(evt);
}
diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs
new file mode 100644
index 000000000..13f8ee89d
--- /dev/null
+++ b/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs
@@ -0,0 +1,118 @@
+using System;
+using FluentAssertions;
+using WorkflowCore.Interface;
+using WorkflowCore.Models;
+using WorkflowCore.Testing;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace WorkflowCore.IntegrationTests.Scenarios;
+
+public class ApprovalScenario : WorkflowTest
+{
+ public class ApprovalInput
+ {
+ public string Id { get; set; }
+ public bool Approved { get; set; }
+ public TimeSpan TimeSpan { get; set; }
+ public string Message { get; set; }
+ }
+
+ public class ParentWorkflow : IWorkflow
+ {
+ public string Id => nameof(ParentWorkflow);
+
+ public int Version => 1;
+
+ public void Build(IWorkflowBuilder builder)
+ {
+ builder
+ .StartWith(context => ExecutionResult.Next())
+ .SubWorkflow(nameof(ChildWorkflow))
+ .Output(i => i.Approved, step => ((ApprovalInput)step.Result).Approved)
+ /*
+ * this does throw an exception
+ .If(data => data.Approved)
+ .Do(then =>
+ ExecutionResult.Outcome(1248))*/;
+ }
+ }
+
+ public class ChildWorkflow : IWorkflow
+ {
+ public string Id => nameof(ChildWorkflow);
+
+ public int Version => 1;
+
+ public void Build(IWorkflowBuilder builder)
+ {
+ builder
+ .StartWith(context => ExecutionResult.Next())
+ .Parallel()
+ .Do(then
+ => then
+ .Delay(i => i.TimeSpan)
+ .Output(i => i.Approved, step => false)
+ .EndWorkflow()
+ )
+ .Do(then
+ => then
+ .WaitFor("Approved", e => e.Id)
+ .Output(i => i.Approved, step => step.EventData)
+ .EndWorkflow()
+ )
+ .Join();
+ }
+ }
+
+ public ApprovalScenario()
+ {
+ Setup();
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public void Scenario(bool approved)
+ {
+ Host.Registry.RegisterWorkflow(new ChildWorkflow());
+
+ var eventKey = Guid.NewGuid().ToString();
+ var workflowId = StartWorkflow(new ApprovalInput
+ {
+ Id = eventKey,
+ TimeSpan = TimeSpan.FromMinutes(10)
+ });
+
+ WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5));
+ UnhandledStepErrors.Should().BeEmpty();
+
+ Host.PublishEvent("Approved", workflowId, approved);
+
+ WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(10));
+
+ System.Threading.Thread.Sleep(2000);
+
+ UnhandledStepErrors.Should().BeEmpty();
+ GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
+ GetData(workflowId).Approved.Should().Be(approved);
+ }
+
+ [Fact]
+ public void Timeout()
+ {
+ Host.Registry.RegisterWorkflow(new ChildWorkflow());
+
+ var workflowId = StartWorkflow(new ApprovalInput
+ {
+ Id = Guid.NewGuid().ToString(),
+ TimeSpan = TimeSpan.FromSeconds(5)
+ });
+ WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(2));
+ WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(10));
+
+ UnhandledStepErrors.Should().BeEmpty();
+ GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
+ GetData(workflowId).Approved.Should().BeFalse();
+ }
+}
diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs
new file mode 100644
index 000000000..b70314652
--- /dev/null
+++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs
@@ -0,0 +1,16 @@
+using Microsoft.Extensions.DependencyInjection;
+using WorkflowCore.IntegrationTests.Scenarios;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace WorkflowCore.Tests.SqlServer.Scenarios
+{
+ [Collection("SqlServer collection")]
+ public class SqlServerApprovalScenario() : ApprovalScenario()
+ {
+ protected override void ConfigureServices(IServiceCollection services)
+ {
+ services.AddWorkflow(x => x.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true));
+ }
+ }
+}
From de83dc55a66b1d7f384cf457ea0b2d3abf03997a Mon Sep 17 00:00:00 2001
From: Karel Frajtak <1510452+kfrajtak@users.noreply.github.com>
Date: Sat, 28 Jun 2025 16:03:03 +0200
Subject: [PATCH 2/4] sub workflow result propagation as output to parent
---
.../Services/FluentBuilders/StepBuilder.cs | 10 +++++++++
.../Scenarios/ApprovalScenario.cs | 22 ++++++++++++++++---
.../Scenarios/SqliteApprovalScenario.cs | 21 ++++++++++++++++++
3 files changed, 50 insertions(+), 3 deletions(-)
create mode 100644 test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs
diff --git a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
index 9a2e12e50..2673cdbd8 100644
--- a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
+++ b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
@@ -603,6 +603,16 @@ public IStepBuilder SubWorkflow(
if (parameters != null)
stepBuilder.Input((step) => step.Parameters, parameters);
+
+ // use the result of the sub workflow as an output
+ // merge it with parent workflow data
+ stepBuilder.Output((body, data) =>
+ {
+ foreach (var prop in typeof(TData).GetProperties())
+ {
+ prop.SetValue(data, prop.GetValue(body.Result));
+ }
+ });
Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
return stepBuilder;
diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs
index 13f8ee89d..dced900c5 100644
--- a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs
+++ b/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs
@@ -1,5 +1,6 @@
using System;
using FluentAssertions;
+using Newtonsoft.Json.Linq;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using WorkflowCore.Testing;
@@ -58,7 +59,12 @@ public void Build(IWorkflowBuilder builder)
.Do(then
=> then
.WaitFor("Approved", e => e.Id)
- .Output(i => i.Approved, step => step.EventData)
+ .Output((w, input) =>
+ {
+ var j = JObject.FromObject(w.EventData);
+ input.Approved = j["Approved"].Value();
+ input.Message= j["Message"].Value();
+ })
.EndWorkflow()
)
.Join();
@@ -87,7 +93,11 @@ public void Scenario(bool approved)
WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5));
UnhandledStepErrors.Should().BeEmpty();
- Host.PublishEvent("Approved", workflowId, approved);
+ Host.PublishEvent("Approved", workflowId, new
+ {
+ Approved = approved,
+ Message = "message " + approved
+ });
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(10));
@@ -95,7 +105,13 @@ public void Scenario(bool approved)
UnhandledStepErrors.Should().BeEmpty();
GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
- GetData(workflowId).Approved.Should().Be(approved);
+ GetData(workflowId).ShouldBeEquivalentTo(new ApprovalInput
+ {
+ Id = eventKey,
+ Approved = approved,
+ Message = "message " + approved,
+ TimeSpan = TimeSpan.FromMinutes(10)
+ });
}
[Fact]
diff --git a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs
new file mode 100644
index 000000000..c910c201c
--- /dev/null
+++ b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs
@@ -0,0 +1,21 @@
+using System;
+using Microsoft.Extensions.DependencyInjection;
+using WorkflowCore.IntegrationTests.Scenarios;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace WorkflowCore.Tests.Sqlite.Scenarios
+{
+ [Collection("Sqlite collection")]
+ public class SqliteApprovalScenario : ApprovalScenario
+ {
+ protected override void ConfigureServices(IServiceCollection services)
+ {
+ services.AddWorkflow(cfg =>
+ {
+ cfg.UseSqlite($"Data Source=wfc-tests-{DateTime.Now.Ticks}.db;", true);
+ cfg.UsePollInterval(TimeSpan.FromSeconds(2));
+ });
+ }
+ }
+}
\ No newline at end of file
From b41bc0dbf3e0419ddf01cc6eb4f9aaef4b85cb54 Mon Sep 17 00:00:00 2001
From: Karel Frajtak <1510452+kfrajtak@users.noreply.github.com>
Date: Sun, 29 Jun 2025 13:28:13 +0200
Subject: [PATCH 3/4] failure in sub workflow will terminate the parent
workflow
---
.../SubWorkflowLifeCycleEvent.cs | 7 ++++
.../Primitives/SubWorkflowStepBody.cs | 15 ++++----
src/WorkflowCore/Services/WorkflowExecutor.cs | 29 ++++++++++++++++
src/WorkflowCore/Services/WorkflowHost.cs | 9 +++--
.../EntityFrameworkPersistenceProvider.cs | 4 +++
...ovalScenario.cs => SubWorkflowScenario.cs} | 34 +++++++++++++++++--
...rio.cs => SqlServerSubWorkflowScenario.cs} | 2 +-
...enario.cs => SqliteSubWorkflowScenario.cs} | 2 +-
8 files changed, 89 insertions(+), 13 deletions(-)
create mode 100644 src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs
rename test/WorkflowCore.IntegrationTests/Scenarios/{ApprovalScenario.cs => SubWorkflowScenario.cs} (78%)
rename test/WorkflowCore.Tests.SqlServer/Scenarios/{SqlServerApprovalScenario.cs => SqlServerSubWorkflowScenario.cs} (86%)
rename test/WorkflowCore.Tests.Sqlite/Scenarios/{SqliteApprovalScenario.cs => SqliteSubWorkflowScenario.cs} (89%)
diff --git a/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs b/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs
new file mode 100644
index 000000000..cbb7cc4e5
--- /dev/null
+++ b/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs
@@ -0,0 +1,7 @@
+namespace WorkflowCore.Models.LifeCycleEvents
+{
+ public class SubWorkflowLifeCycleEvent : LifeCycleEvent
+ {
+
+ }
+}
\ No newline at end of file
diff --git a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs
index 6ab3c06b4..b2ac30a30 100644
--- a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs
+++ b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs
@@ -18,8 +18,6 @@ public SubWorkflowStepBody(IScopeProvider scopeProvider)
public override ExecutionResult Run(IStepExecutionContext context)
{
- var eventKey = context.ExecutionPointer.EventKey;
-
var scope = _scopeProvider.CreateScope(context);
var workflowController = scope.ServiceProvider.GetRequiredService();
var logger = scope.ServiceProvider.GetRequiredService().CreateLogger(
@@ -32,19 +30,24 @@ public override ExecutionResult Run(IStepExecutionContext context)
logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})",
SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id);
- logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event WorkflowCompleted with key='{EventKey}'",
+ logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event SubWorkflowLifeCycleEvent with key='{EventKey}'",
SubWorkflowId, result, result);
var effectiveDate = DateTime.MinValue;
- return ExecutionResult.WaitForEvent(nameof(WorkflowCompleted), result, effectiveDate);
+ return ExecutionResult.WaitForEvent(nameof(SubWorkflowLifeCycleEvent), result, effectiveDate);
}
logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId,
context.ExecutionPointer.EventKey);
var persistenceProvider = scope.ServiceProvider.GetRequiredService();
-
- Result = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result.Data;
+ var workflowInstance = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result;
+ if (workflowInstance.Status == WorkflowStatus.Terminated)
+ {
+ throw new NotImplementedException(workflowInstance.Status.ToString());
+ }
+
+ Result = workflowInstance.Data;
return ExecutionResult.Next();
}
diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs
index 7d8e1000a..0cdff408a 100755
--- a/src/WorkflowCore/Services/WorkflowExecutor.cs
+++ b/src/WorkflowCore/Services/WorkflowExecutor.cs
@@ -227,6 +227,12 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
return;
}
+ if (workflow.Status == WorkflowStatus.Terminated)
+ {
+ await OnTerminated(workflow, def);
+ return;
+ }
+
foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List()).Count == 0))
{
if (!pointer.SleepUntil.HasValue)
@@ -284,5 +290,28 @@ private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def)
Version = workflow.Version
});
}
+
+ private async Task OnTerminated(WorkflowInstance workflow, WorkflowDefinition def)
+ {
+ workflow.Status = WorkflowStatus.Terminated;
+ workflow.CompleteTime = _datetimeProvider.UtcNow;
+
+ using (var scope = _serviceProvider.CreateScope())
+ {
+ var middlewareRunner = scope.ServiceProvider.GetRequiredService();
+ await middlewareRunner.RunPostMiddleware(workflow, def);
+ }
+
+ _logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) terminated", workflow.WorkflowDefinitionId, workflow.Id);
+
+ _publisher.PublishNotification(new WorkflowTerminated
+ {
+ EventTimeUtc = _datetimeProvider.UtcNow,
+ Reference = workflow.Reference,
+ WorkflowInstanceId = workflow.Id,
+ WorkflowDefinitionId = workflow.WorkflowDefinitionId,
+ Version = workflow.Version
+ });
+ }
}
}
\ No newline at end of file
diff --git a/src/WorkflowCore/Services/WorkflowHost.cs b/src/WorkflowCore/Services/WorkflowHost.cs
index 0fe74b770..6fc8caf69 100644
--- a/src/WorkflowCore/Services/WorkflowHost.cs
+++ b/src/WorkflowCore/Services/WorkflowHost.cs
@@ -165,10 +165,13 @@ public Task TerminateWorkflow(string workflowId)
public void HandleLifeCycleEvent(LifeCycleEvent evt)
{
- if (evt is WorkflowCompleted completed)
+ switch (evt)
{
- _workflowController.PublishEvent(nameof(WorkflowCompleted), completed.WorkflowInstanceId,
- completed.Reference);
+ // publish the event as sub workflow lifecycle event
+ case WorkflowCompleted _:
+ case WorkflowTerminated _:
+ _workflowController.PublishEvent(nameof(SubWorkflowLifeCycleEvent), evt.WorkflowInstanceId, evt.Reference);
+ break;
}
OnLifeCycleEvent?.Invoke(evt);
diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs
index 22ba680f5..b06f275b4 100644
--- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs
+++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs
@@ -100,6 +100,10 @@ public async Task GetWorkflowInstance(string Id, CancellationT
{
using (var db = ConstructDbContext())
{
+ if (!Guid.TryParse(Id, out _))
+ {
+
+ }
var uid = new Guid(Id);
var raw = await db.Set()
.Include(wf => wf.ExecutionPointers)
diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs
similarity index 78%
rename from test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs
rename to test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs
index dced900c5..dc87c60bd 100644
--- a/test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs
+++ b/test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs
@@ -9,7 +9,7 @@
namespace WorkflowCore.IntegrationTests.Scenarios;
-public class ApprovalScenario : WorkflowTest
+public class SubWorkflowScenario : WorkflowTest
{
public class ApprovalInput
{
@@ -28,6 +28,7 @@ public class ParentWorkflow : IWorkflow
public void Build(IWorkflowBuilder builder)
{
builder
+ .UseDefaultErrorBehavior(WorkflowErrorHandling.Terminate)
.StartWith(context => ExecutionResult.Next())
.SubWorkflow(nameof(ChildWorkflow))
.Output(i => i.Approved, step => ((ApprovalInput)step.Result).Approved)
@@ -48,6 +49,7 @@ public class ChildWorkflow : IWorkflow
public void Build(IWorkflowBuilder builder)
{
builder
+ .UseDefaultErrorBehavior(WorkflowErrorHandling.Terminate)
.StartWith(context => ExecutionResult.Next())
.Parallel()
.Do(then
@@ -71,7 +73,7 @@ public void Build(IWorkflowBuilder builder)
}
}
- public ApprovalScenario()
+ public SubWorkflowScenario()
{
Setup();
}
@@ -114,6 +116,34 @@ public void Scenario(bool approved)
});
}
+ [Fact]
+ public void Failure()
+ {
+ Host.Registry.RegisterWorkflow(new ChildWorkflow());
+
+ var eventKey = Guid.NewGuid().ToString();
+ var workflowId = StartWorkflow(new ApprovalInput
+ {
+ Id = eventKey,
+ TimeSpan = TimeSpan.FromMinutes(10)
+ });
+
+ WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5));
+ UnhandledStepErrors.Should().BeEmpty();
+
+ Host.PublishEvent("Approved", workflowId, new
+ {
+ Approved = "string"
+ });
+
+ WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(20));
+
+ System.Threading.Thread.Sleep(2000);
+
+ UnhandledStepErrors.Should().NotBeEmpty();
+ GetStatus(workflowId).Should().Be(WorkflowStatus.Terminated);
+ }
+
[Fact]
public void Timeout()
{
diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs
similarity index 86%
rename from test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs
rename to test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs
index b70314652..6fc07c5a6 100644
--- a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs
+++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs
@@ -6,7 +6,7 @@
namespace WorkflowCore.Tests.SqlServer.Scenarios
{
[Collection("SqlServer collection")]
- public class SqlServerApprovalScenario() : ApprovalScenario()
+ public class SqlServerSubWorkflowScenario() : SubWorkflowScenario()
{
protected override void ConfigureServices(IServiceCollection services)
{
diff --git a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs
similarity index 89%
rename from test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs
rename to test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs
index c910c201c..d1e2ae06a 100644
--- a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs
+++ b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs
@@ -7,7 +7,7 @@
namespace WorkflowCore.Tests.Sqlite.Scenarios
{
[Collection("Sqlite collection")]
- public class SqliteApprovalScenario : ApprovalScenario
+ public class SqliteSubWorkflowScenario : SubWorkflowScenario
{
protected override void ConfigureServices(IServiceCollection services)
{
From de2a6d5deadc0b4c46f1d2d14497df72f6031615 Mon Sep 17 00:00:00 2001
From: Karel Frajtak <1510452+kfrajtak@users.noreply.github.com>
Date: Mon, 30 Jun 2025 09:50:18 +0200
Subject: [PATCH 4/4] ExecutionError.Message will contain full exception
information
---
src/WorkflowCore/Services/WorkflowExecutor.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs
index 0cdff408a..205e30945 100755
--- a/src/WorkflowCore/Services/WorkflowExecutor.cs
+++ b/src/WorkflowCore/Services/WorkflowExecutor.cs
@@ -89,7 +89,7 @@ public async Task Execute(WorkflowInstance workflow, Can
WorkflowId = workflow.Id,
ExecutionPointerId = pointer.Id,
ErrorTime = _datetimeProvider.UtcNow,
- Message = ex.Message
+ Message = ex.ToString()
});
_executionResultProcessor.HandleStepException(workflow, def, pointer, step, ex);