Skip to content

Commit 2150dcc

Browse files
authored
sample: sample for workflow ExecuteByQS(http)&& correct typo in WorkflowGlobalTransaction class name (#92)
* sample: sample for workflow ExecuteByQS - Implement wf-crash endpoint to simulate workflow crash - Implement wf-resume endpoint for workflow http callback - Modify WorlflowGlobalTransaction.cs to add Exists method for debug only. * refactor(Dtmworkflow): correct typo in WorkflowGlobalTransaction class name - Rename WorlflowGlobalTransaction to WorkflowGlobalTransaction - Update references in ServiceCollectionExtensions, WfTestController, and tests * test: add unit tests for WorkflowGlobalTransaction.Exists method
1 parent 9926030 commit 2150dcc

File tree

6 files changed

+142
-16
lines changed

6 files changed

+142
-16
lines changed

samples/DtmSample/Controllers/WfTestController.cs

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
using Microsoft.Extensions.Logging;
77
using Microsoft.Extensions.Options;
88
using System;
9+
using System.IO;
910
using System.Net.Http;
1011
using System.Net.Http.Headers;
1112
using System.Text;
1213
using System.Text.Json;
14+
using System.Text.Unicode;
1315
using System.Threading;
1416
using System.Threading.Tasks;
1517

@@ -21,10 +23,10 @@ public class WfTestController : ControllerBase
2123
{
2224

2325
private readonly ILogger<WfTestController> _logger;
24-
private readonly WorlflowGlobalTransaction _globalTransaction;
26+
private readonly WorkflowGlobalTransaction _globalTransaction;
2527
private readonly AppSettings _settings;
2628

27-
public WfTestController(ILogger<WfTestController> logger, IOptions<AppSettings> optionsAccs, WorlflowGlobalTransaction transaction)
29+
public WfTestController(ILogger<WfTestController> logger, IOptions<AppSettings> optionsAccs, WorkflowGlobalTransaction transaction)
2830
{
2931
_logger = logger;
3032
_settings = optionsAccs.Value;
@@ -253,5 +255,84 @@ public async Task<IActionResult> TccRollBack(CancellationToken cancellationToken
253255
return Ok(TransResponse.BuildFailureResponse());
254256
}
255257
}
258+
259+
260+
private static readonly string wfNameForResume = "wfNameForResume";
261+
262+
/// <summary>
263+
///
264+
/// </summary>
265+
/// <param name="cancellationToken"></param>
266+
/// <returns></returns>
267+
[HttpPost("wf-crash")]
268+
public async Task<IActionResult> Crash(CancellationToken cancellationToken)
269+
{
270+
if (!_globalTransaction.Exists(wfNameForResume))
271+
{
272+
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
273+
{
274+
var content = new ByteArrayContent(data);
275+
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
276+
277+
var outClient = wf.NewBranch().NewRequest();
278+
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);
279+
280+
// the first branch succeed, then crashed, the dtm server will call back the flowing wf-call-back
281+
// manual stop application
282+
Environment.Exit(0);
283+
284+
var inClient = wf.NewBranch().NewRequest();
285+
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);
286+
287+
return null;
288+
});
289+
}
290+
291+
var req = JsonSerializer.Serialize(new TransRequest("1", -30));
292+
await _globalTransaction.Execute(wfNameForResume, Guid.NewGuid().ToString("N"), Encoding.UTF8.GetBytes(req), true);
293+
294+
return Ok(TransResponse.BuildSucceedResponse());
295+
}
296+
297+
[HttpPost("wf-resume")]
298+
public async Task<IActionResult> WfResume(CancellationToken cancellationToken)
299+
{
300+
try
301+
{
302+
if (!_globalTransaction.Exists(wfNameForResume))
303+
{
304+
// register again after manual crash by Environment.Exit(0);
305+
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
306+
{
307+
var content = new ByteArrayContent(data);
308+
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
309+
310+
var outClient = wf.NewBranch().NewRequest();
311+
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);
312+
313+
var inClient = wf.NewBranch().NewRequest();
314+
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);
315+
316+
return null;
317+
});
318+
}
319+
320+
// prepared call ExecuteByQS
321+
using var bodyMemoryStream = new MemoryStream();
322+
await Request.Body.CopyToAsync(bodyMemoryStream, cancellationToken);
323+
byte[] bytes = bodyMemoryStream.ToArray();
324+
string body = Encoding.UTF8.GetString(bytes);
325+
_logger.LogDebug($"body: {body}");
326+
327+
await _globalTransaction.ExecuteByQS(Request.Query, bodyMemoryStream.ToArray());
328+
329+
return Ok(TransResponse.BuildSucceedResponse());
330+
}
331+
catch (Exception ex)
332+
{
333+
_logger.LogError(ex, "Workflow Error");
334+
return Ok(TransResponse.BuildFailureResponse());
335+
}
336+
}
256337
}
257338
}

samples/DtmSample/Startup.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public void ConfigureServices(IServiceCollection services)
2929
dtm.SqlDbType = Configuration.GetValue<string>("AppSettings:SqlDbType");
3030
dtm.BarrierSqlTableName = Configuration.GetValue<string>("AppSettings:BarrierSqlTableName");
3131
dtm.DtmGrpcUrl = Configuration.GetValue<string>("AppSettings:DtmGrpcUrl");
32-
dtm.HttpCallback = "";
32+
dtm.HttpCallback = $"{Configuration.GetValue<string>("AppSettings:BusiUrl")}/wf-resume";
3333
dtm.GrpcCallback = "";
3434
});
3535

src/Dtmworkflow/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services
2121
services.AddDtmGrpc(setupAction);
2222

2323
services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
24-
services.TryAddSingleton<WorlflowGlobalTransaction>();
24+
services.TryAddSingleton<WorkflowGlobalTransaction>();
2525

2626
return services;
2727
}
@@ -32,7 +32,7 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services
3232
services.AddDtmGrpc(configuration, sectionName);
3333

3434
services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
35-
services.TryAddSingleton<WorlflowGlobalTransaction>();
35+
services.TryAddSingleton<WorkflowGlobalTransaction>();
3636

3737
return services;
3838
}

src/Dtmworkflow/WorlflowGlobalTransaction.cs renamed to src/Dtmworkflow/WorkflowGlobalTransaction.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@
66

77
namespace Dtmworkflow
88
{
9-
public class WorlflowGlobalTransaction
9+
public class WorkflowGlobalTransaction
1010
{
1111
private readonly Dictionary<string, WfItem> _handlers;
1212
private readonly IWorkflowFactory _workflowFactory;
1313
private readonly ILogger _logger;
1414

15-
public WorlflowGlobalTransaction(IWorkflowFactory workflowFactory, ILoggerFactory loggerFactory)
15+
public WorkflowGlobalTransaction(IWorkflowFactory workflowFactory, ILoggerFactory loggerFactory)
1616
{
1717
this._handlers = new Dictionary<string, WfItem>();
1818
this._workflowFactory = workflowFactory;
19-
this._logger = loggerFactory.CreateLogger<WorlflowGlobalTransaction>();
19+
this._logger = loggerFactory.CreateLogger<WorkflowGlobalTransaction>();
2020
}
2121

2222
public async Task<byte[]> Execute(string name, string gid, byte[] data, bool isHttp = true)
@@ -51,7 +51,7 @@ public void Register(string name, WfFunc2 handler, params Action<Workflow>[] cus
5151
Custom = custom.ToList()
5252
});
5353
}
54-
54+
5555
#if NET5_0_OR_GREATER
5656
public async Task ExecuteByQS(Microsoft.AspNetCore.Http.IQueryCollection query, byte[] body)
5757
{
@@ -61,5 +61,12 @@ public async Task ExecuteByQS(Microsoft.AspNetCore.Http.IQueryCollection query,
6161
await Execute(op, gid, body, true);
6262
}
6363
#endif
64+
65+
#if DEBUG // for sample only
66+
public bool Exists(string name)
67+
{
68+
return this._handlers.ContainsKey(name);
69+
}
70+
#endif
6471
}
6572
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using Dtmcli;
2+
using Dtmgrpc;
3+
using Microsoft.Extensions.Logging.Abstractions;
4+
using Moq;
5+
6+
namespace Dtmworkflow.Tests;
7+
8+
public class WorkflowGlobalTransactionTest
9+
{
10+
#if DEBUG
11+
[Fact]
12+
public void Exists()
13+
{
14+
var factory = new Mock<IWorkflowFactory>();
15+
var wf = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
16+
17+
Assert.Throws<System.ArgumentNullException>(() => wf.Exists(null));
18+
Assert.False(wf.Exists(string.Empty));
19+
Assert.False(wf.Exists("my-wf1"));
20+
Assert.False(wf.Exists("my-wf2"));
21+
Assert.False(wf.Exists("my-wf3"));
22+
23+
wf.Register("my-wf1", (workflow, data) => null);
24+
wf.Register("my-wf2", (workflow, data) => null);
25+
26+
Assert.Throws<System.ArgumentNullException>(() => wf.Exists(null));
27+
Assert.False(wf.Exists(string.Empty));
28+
Assert.True(wf.Exists("my-wf1"));
29+
Assert.True(wf.Exists("my-wf2"));
30+
Assert.False(wf.Exists("my-wf3"));
31+
32+
var wf2 = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
33+
Assert.False(wf2.Exists("my-wf1"));
34+
Assert.False(wf2.Exists("my-wf2"));
35+
Assert.False(wf2.Exists("my-wf3"));
36+
}
37+
#endif
38+
}

tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public async void Execute_Should_Succeed_When_PWF_Succeed()
2727

2828
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
2929

30-
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
30+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
3131

3232
var wfName = nameof(Execute_Should_Succeed_When_PWF_Succeed);
3333
var gid = Guid.NewGuid().ToString("N");
@@ -56,7 +56,7 @@ public async void Execute_Should_Throw_DtmFailureException_When_PWF_Failed()
5656

5757
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
5858

59-
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
59+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
6060

6161
var wfName = nameof(Execute_Should_Throw_DtmFailureException_When_PWF_Failed);
6262
var gid = Guid.NewGuid().ToString("N");
@@ -90,7 +90,7 @@ public async void Execute_Should_Succeed_When_PWF_Submitted_And_Progress_Not_Fai
9090

9191
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
9292

93-
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
93+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
9494

9595
var wfName = nameof(Execute_Should_Succeed_When_PWF_Submitted_And_Progress_Not_Failed);
9696
var gid = Guid.NewGuid().ToString("N");
@@ -125,7 +125,7 @@ public async void Execute_Should_ThrowException_When_WfFunc2_ThrowException()
125125

126126
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
127127

128-
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
128+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
129129

130130
var wfName = nameof(Execute_Should_ThrowException_When_WfFunc2_ThrowException);
131131
var gid = Guid.NewGuid().ToString("N");
@@ -159,7 +159,7 @@ public async void Execute_Should_Return_Null_When_WfFunc2_ThrowDtmFailureExcepti
159159

160160
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
161161

162-
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
162+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
163163

164164
var wfName = nameof(Execute_Should_Return_Null_When_WfFunc2_ThrowDtmFailureException);
165165
var gid = Guid.NewGuid().ToString("N");
@@ -192,7 +192,7 @@ public async void Rollback_Should_Be_Executed()
192192

193193
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
194194

195-
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
195+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
196196

197197
var wfName = nameof(Rollback_Should_Be_Executed);
198198
var gid = Guid.NewGuid().ToString("N");
@@ -232,7 +232,7 @@ public async void Commit_Should_Be_Executed()
232232

233233
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
234234

235-
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
235+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
236236

237237
var wfName = nameof(Commit_Should_Be_Executed);
238238
var gid = Guid.NewGuid().ToString("N");

0 commit comments

Comments
 (0)