Skip to content

Commit fe25d97

Browse files
Feature http xa (#69)
* add http xa implementation * Fix the key conflict caused by repeated registration of DI * xa local transaction handle barrier exception * add XA patter README * Ignore repeat commit or rollback with the same id error * Adjust the Connection opening to asynchronous Co-authored-by: Catcher Wong <catcher_hwq@outlook.com> * Adjust the use of "xa" literals to TYPE_XA constants Co-authored-by: Catcher Wong <catcher_hwq@outlook.com> * remove _special duplicate get * Adjust Xa's FromQuery, Excecute function signatures, and add parameter checks * Change the parameter name quersy to query * Modify ‘Excecute’ to ‘ExcecuteAsync’ in xa pattern in README --------- Co-authored-by: Catcher Wong <catcher_hwq@outlook.com>
1 parent 10272be commit fe25d97

File tree

11 files changed

+365
-12
lines changed

11 files changed

+365
-12
lines changed

README-cn.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,28 @@ public class MyBusi
253253
}
254254
```
255255

256+
### XA pattern
257+
258+
```cs
259+
public class MyBusi
260+
{
261+
private readonly Dtmcli.XaGlobalTransaction _globalTransaction;
262+
263+
public MyBusi(Dtmcli.XaGlobalTransaction globalTransaction)
264+
{
265+
this._globalTransaction = globalTransaction;
266+
}
267+
268+
public async Task DoBusAsync()
269+
{
270+
await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
271+
{
272+
await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
273+
await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
274+
}, cancellationToken);
275+
}
276+
}
277+
```
256278

257279
## 可运行的使用示例
258280

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,30 @@ public class MyBusi
260260
}
261261
```
262262

263+
### XA pattern
264+
265+
266+
```cs
267+
public class MyBusi
268+
{
269+
private readonly Dtmcli.XaGlobalTransaction _globalTransaction;
270+
271+
public MyBusi(Dtmcli.XaGlobalTransaction globalTransaction)
272+
{
273+
this._globalTransaction = globalTransaction;
274+
}
275+
276+
public async Task DoBusAsync()
277+
{
278+
await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
279+
{
280+
await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
281+
await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
282+
}, cancellationToken);
283+
}
284+
}
285+
```
286+
263287
## Complete example
264288

265289

src/DtmCommon/Constant.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ public class Constant
77
public static readonly string TYPE_TCC = "tcc";
88
public static readonly string TYPE_SAGA = "saga";
99
public static readonly string TYPE_MSG = "msg";
10+
public static readonly string TYPE_XA = "xa";
1011

1112
public static readonly string ResultFailure = "FAILURE";
1213
public static readonly string ResultSuccess = "SUCCESS";

src/DtmCommon/Imp/DbSpecialDelegate.cs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,37 @@ public class DbSpecialDelegate
88
{
99
private readonly IDbSpecial _special;
1010

11+
private readonly Dictionary<string, IDbSpecial> _specialDic;
12+
1113
public DbSpecialDelegate(IEnumerable<IDbSpecial> specials, IOptions<DtmOptions> optionsAccs)
1214
{
13-
var dbSpecial = specials.FirstOrDefault(x => x.Name.Equals(optionsAccs.Value.SqlDbType));
15+
this._specialDic = GetSpecialDictionary(specials);
16+
if (this._specialDic.TryGetValue(optionsAccs.Value.SqlDbType, out _special) == false)
17+
throw new DtmException($"unknown db type '{optionsAccs.Value.SqlDbType}'");
18+
}
1419

15-
if (dbSpecial == null) throw new DtmException($"unknown db type '{optionsAccs.Value.SqlDbType}'");
20+
public IDbSpecial GetDbSpecial() => _special;
21+
22+
public IDbSpecial GetDbSpecialByName(string sqlDbType)
23+
{
24+
IDbSpecial special;
25+
if (this._specialDic.TryGetValue(sqlDbType, out special) == false)
26+
throw new DtmException($"unknown db type '{sqlDbType}'");
1627

17-
_special = dbSpecial;
28+
return special;
1829
}
1930

20-
public IDbSpecial GetDbSpecial() => _special;
31+
Dictionary<string, IDbSpecial> GetSpecialDictionary(IEnumerable<IDbSpecial> specials)
32+
{
33+
Dictionary<string, IDbSpecial> specialDic = new();
34+
foreach (var special in specials)
35+
{
36+
if (specialDic.ContainsKey(special.Name) == false)
37+
specialDic.Add(special.Name, special);
38+
}
39+
40+
return specialDic;
41+
}
2142
}
2243

2344
}

src/Dtmcli/Constant.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ internal static class Request
4343

4444
internal const string OPERATION_REGISTERBRANCH = "registerBranch";
4545

46+
internal const string PHASE2_URL = "phase2_url";
47+
4648
/// <summary>
4749
/// branch type for message, SAGA, XA
4850
/// </summary>

src/Dtmcli/DtmClient.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ namespace Dtmcli
1313
public class DtmClient : IDtmClient
1414
{
1515
private static readonly char Slash = '/';
16-
private static readonly string QueryStringFormat = "dtm={0}&gid={1}&trans_type={2}&branch_id={3}&op={4}";
1716
private static readonly string QuestionMark = "?";
1817
private static readonly string And = "&";
1918

@@ -91,13 +90,10 @@ public async Task TransRegisterBranch(TransBase tb, Dictionary<string, string> a
9190

9291
public async Task<HttpResponseMessage> TransRequestBranch(TransBase tb, HttpMethod method, object body, string branchID, string op, string url, CancellationToken cancellationToken)
9392
{
94-
var queryParams = string.Format(
95-
QueryStringFormat,
96-
string.Concat(_dtmOptions.DtmUrl.TrimEnd(Slash), Constant.Request.URLBASE_PREFIX),
97-
tb.Gid,
98-
tb.TransType,
99-
branchID,
100-
op);
93+
var uriPath = string.Concat(_dtmOptions.DtmUrl.TrimEnd(Slash), Constant.Request.URLBASE_PREFIX);
94+
var queryParams = tb.TransType == DtmCommon.Constant.TYPE_XA ?
95+
$"dtm={uriPath}&gid={tb.Gid}&trans_type={tb.TransType}&branch_id={branchID}&op={op}&phase2_url={url}" :
96+
$"dtm={uriPath}&gid={tb.Gid}&trans_type={tb.TransType}&branch_id={branchID}&op={op}";
10197

10298
var client = _httpClientFactory.CreateClient(Constant.BranchClientHttpName);
10399

src/Dtmcli/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ private static void AddDtmCore(IServiceCollection services)
9292
services.TryAddSingleton<IDtmTransFactory, DtmTransFactory>();
9393
services.TryAddSingleton<IDtmClient, DtmClient>();
9494
services.TryAddSingleton<TccGlobalTransaction>();
95+
services.TryAddSingleton<XaGlobalTransaction>();
96+
services.TryAddSingleton<XaLocalTransaction>();
9597

9698
DtmCommon.ServiceCollectionExtensions.AddDtmCommon(services);
9799

src/Dtmcli/Xa/Xa.cs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
using Dtmcli.DtmImp;
2+
using DtmCommon;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Net.Http;
6+
using System.Text.Json.Serialization;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using static Dtmcli.Constant;
10+
11+
namespace Dtmcli
12+
{
13+
public sealed class Xa : TransBase
14+
{
15+
private readonly IDtmClient _dtmClient;
16+
17+
internal Xa(IDtmClient dtmHttpClient, string gid)
18+
{
19+
this._dtmClient = dtmHttpClient;
20+
this.Gid = gid;
21+
this.TransType = DtmCommon.Constant.TYPE_XA;
22+
this.BranchIDGen = new BranchIDGen();
23+
}
24+
25+
internal Xa(IDtmClient dtmHttpClient)
26+
{
27+
this._dtmClient = dtmHttpClient;
28+
}
29+
30+
public async Task<string> CallBranch(object body, string url, CancellationToken cancellationToken = default)
31+
{
32+
using var response = await _dtmClient.TransRequestBranch(
33+
this,
34+
HttpMethod.Post,
35+
body,
36+
this.BranchIDGen.NewSubBranchID(),
37+
Constant.Request.BRANCH_ACTION,
38+
url,
39+
cancellationToken).ConfigureAwait(false);
40+
41+
Exception ex = await Utils.RespAsErrorCompatible(response);
42+
if (null != ex)
43+
throw ex;
44+
45+
return await response.Content.ReadAsStringAsync();
46+
}
47+
48+
[JsonIgnore]
49+
public string Phase2Url { get; set; }
50+
51+
52+
#if NET5_0_OR_GREATER
53+
public static Xa FromQuery(IDtmClient dtmClient, Microsoft.AspNetCore.Http.IQueryCollection query)
54+
{
55+
if (query.TryGetValue(Request.GID, out var gid) == false || string.IsNullOrEmpty(gid))
56+
throw new ArgumentNullException(Request.GID);
57+
58+
if (query.TryGetValue(Request.TRANS_TYPE, out var transType) == false || string.IsNullOrEmpty(transType))
59+
throw new ArgumentNullException(Request.TRANS_TYPE);
60+
61+
if (query.TryGetValue(Request.OP, out var op) == false || string.IsNullOrEmpty(op))
62+
throw new ArgumentNullException(Request.OP);
63+
64+
if (query.TryGetValue(Request.BRANCH_ID, out var branchID) == false || string.IsNullOrEmpty(branchID))
65+
throw new ArgumentNullException(Request.BRANCH_ID);
66+
67+
query.TryGetValue(Request.DTM, out var dtm);
68+
query.TryGetValue(Request.PHASE2_URL, out var phase2Url);
69+
70+
return new(dtmClient)
71+
{
72+
Gid = gid,
73+
Dtm = dtm,
74+
Op = op,
75+
TransType = transType,
76+
Phase2Url = phase2Url,
77+
BranchIDGen = new BranchIDGen(branchID),
78+
};
79+
}
80+
#else
81+
public static Xa FromQuery(IDtmClient dtmClient, IDictionary<string, string> query)
82+
{
83+
if (!query.TryGetValue(Request.GID, out var gid) == false || string.IsNullOrEmpty(gid))
84+
throw new ArgumentNullException(Request.GID);
85+
86+
if (query.TryGetValue(Request.TRANS_TYPE, out var transType) == false || string.IsNullOrEmpty(transType))
87+
throw new ArgumentNullException(Request.TRANS_TYPE);
88+
89+
if (query.TryGetValue(Request.OP, out var op) == false || string.IsNullOrEmpty(op))
90+
throw new ArgumentNullException(Request.OP);
91+
92+
if (query.TryGetValue(Request.BRANCH_ID, out var branchID) == false || string.IsNullOrEmpty(branchID))
93+
throw new ArgumentNullException(Request.BRANCH_ID);
94+
95+
query.TryGetValue(Request.DTM, out var dtm);
96+
query.TryGetValue(Request.PHASE2_URL, out var phase2Url);
97+
98+
return new(dtmClient)
99+
{
100+
Gid = gid,
101+
Dtm = dtm,
102+
Op = op,
103+
TransType = transType,
104+
Phase2Url = phase2Url,
105+
BranchIDGen = new BranchIDGen(branchID),
106+
};
107+
}
108+
#endif
109+
}
110+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using DtmCommon;
2+
using Microsoft.Extensions.Logging;
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace Dtmcli
8+
{
9+
public sealed class XaGlobalTransaction
10+
{
11+
private readonly IDtmClient _dtmClient;
12+
private readonly ILogger _logger;
13+
14+
public XaGlobalTransaction(IDtmClient dtmClient, ILoggerFactory factory)
15+
{
16+
this._dtmClient = dtmClient;
17+
this._logger = factory.CreateLogger<XaGlobalTransaction>();
18+
}
19+
20+
public async Task<string> ExcecuteAsync(Func<Xa, Task> xa_cb, CancellationToken cancellationToken = default)
21+
{
22+
var gid = await _dtmClient.GenGid(cancellationToken);
23+
await this.Excecute(gid, xa_cb, cancellationToken);
24+
return gid;
25+
}
26+
27+
public async Task Excecute(string gid, Func<Xa, Task> xa_cb, CancellationToken cancellationToken = default)
28+
{
29+
await ExcecuteAsync(gid, null, xa_cb, cancellationToken);
30+
}
31+
32+
public async Task ExcecuteAsync(string gid, Action<Xa> custom, Func<Xa, Task> xa_cb, CancellationToken cancellationToken = default)
33+
{
34+
Xa xa = new(this._dtmClient, gid);
35+
if (null != custom)
36+
custom(xa);
37+
38+
try
39+
{
40+
await _dtmClient.TransCallDtm(null, xa, Constant.Request.OPERATION_PREPARE, cancellationToken);
41+
await xa_cb(xa);
42+
await _dtmClient.TransCallDtm(null, xa, Constant.Request.OPERATION_SUBMIT, cancellationToken);
43+
}
44+
catch (Exception ex)
45+
{
46+
xa.RollbackReason = ex.Message.Substring(0, ex.Message.Length > 1023 ? 1023 : ex.Message.Length);
47+
_logger.LogError(ex, "prepare or submitting global transaction error");
48+
await _dtmClient.TransCallDtm(null, xa, Constant.Request.OPERATION_ABORT, cancellationToken);
49+
}
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)