Skip to content

Commit c86f875

Browse files
authored
Add http xa sample (#71)
* add http xa sample * Add XA pattern comment in README * Change the DBType of DtmOptions to SqlDbType in the configuration section of the README
1 parent 37e4945 commit c86f875

File tree

4 files changed

+188
-13
lines changed

4 files changed

+188
-13
lines changed

README-cn.md

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ services.AddDtmcli(x =>
8787
x.BranchTimeout = 10000;
8888

8989
// 子事务屏障的数据库类型, mysql, postgres, sqlserver
90-
x.DBType = "mysql";
90+
x.SqlDbType = "mysql";
9191

9292
// 子事务屏障的数据表名
9393
x.BarrierTableName = "dtm_barrier.barrier";
@@ -108,7 +108,7 @@ services.AddDtmcli(Configuration, "dtm");
108108
"DtmUrl": "http://localhost:36789",
109109
"DtmTimeout": 10000,
110110
"BranchTimeout": 10000,
111-
"DBType": "mysql",
111+
"SqlDbType": "mysql",
112112
"BarrierTableName": "dtm_barrier.barrier",
113113
}
114114
}
@@ -265,12 +265,32 @@ public class MyBusi
265265
this._globalTransaction = globalTransaction;
266266
}
267267

268-
public async Task DoBusAsync()
268+
public async Task DoBusAsync(CancellationToken cancellationToken)
269269
{
270+
var svc = "http://localhost:5005";
271+
270272
await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
271273
{
272-
await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
273-
await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
274+
// NOTE: XA 模式的限制
275+
// 当前模式仅支持mysql、postgresDB,请修改相应的客户端配置,如SqlDbType等。
276+
// 如使用Mysql并且版本低于8.0需关闭连接池使用
277+
278+
// 调用 XA 子事务
279+
await xa.CallBranch(
280+
// 参数
281+
new TransRequest("1", -30),
282+
283+
// 操作的 URL
284+
svc + "/XaTransOut",
285+
286+
// 取消令牌
287+
cancellationToken);
288+
289+
await xa.CallBranch(
290+
new TransRequest("2", 30),
291+
svc + "/XaTransIn",
292+
cancellationToken);
293+
274294
}, cancellationToken);
275295
}
276296
}

README.md

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ services.AddDtmcli(x =>
9494
x.BranchTimeout = 10000;
9595

9696
// barrier database type, mysql, postgres, sqlserver
97-
x.DBType = "mysql";
97+
x.SqlDbType = "mysql";
9898

9999
// barrier table name
100100
x.BarrierTableName = "dtm_barrier.barrier";
@@ -115,7 +115,7 @@ And the configuration file
115115
"DtmUrl": "http://localhost:36789",
116116
"DtmTimeout": 10000,
117117
"BranchTimeout": 10000,
118-
"DBType": "mysql",
118+
"SqlDbType": "mysql",
119119
"BarrierTableName": "dtm_barrier.barrier",
120120
}
121121
}
@@ -273,12 +273,32 @@ public class MyBusi
273273
this._globalTransaction = globalTransaction;
274274
}
275275

276-
public async Task DoBusAsync()
276+
public async Task DoBusAsync(CancellationToken cancellationToken)
277277
{
278+
var svc = "http://localhost:5005";
279+
278280
await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
279281
{
280-
await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
281-
await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
282+
// NOTE: Limitations of using Xa mode
283+
// The current mode only supports mysql, postgresDB, please modify the corresponding client configuration, such as SqlDbType, etc.
284+
// Connection pooling needs to be turned off for mysql versions below 8.0
285+
286+
// Create XA sub-transaction
287+
await xa.CallBranch(
288+
// Arguments of action
289+
new TransRequest("1", -30),
290+
291+
// URL of action
292+
svc + "/XaTransOut",
293+
294+
// Cancel token
295+
cancellationToken);
296+
297+
await xa.CallBranch(
298+
new TransRequest("2", 30),
299+
svc + "/XaTransIn",
300+
cancellationToken);
301+
282302
}, cancellationToken);
283303
}
284304
}

samples/DtmSample/Controllers/TransController.cs

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1-
using DtmSample.Dtos;
1+
using Dapper;
2+
using Dtmcli;
3+
using DtmSample.Dtos;
4+
using Microsoft.AspNetCore.Http;
25
using Microsoft.AspNetCore.Mvc;
36
using Microsoft.Extensions.Logging;
7+
using MySqlConnector;
8+
using System.Threading;
9+
using System.Threading.Tasks;
410

511
namespace DtmSample.Controllers
612
{
@@ -10,10 +16,12 @@ namespace DtmSample.Controllers
1016
public class TransController : ControllerBase
1117
{
1218
private readonly ILogger<TransController> _logger;
19+
private readonly XaLocalTransaction _xaTrans;
1320

14-
public TransController(ILogger<TransController> logger)
21+
public TransController(ILogger<TransController> logger, XaLocalTransaction xaTrans)
1522
{
1623
_logger = logger;
24+
_xaTrans = xaTrans;
1725
}
1826

1927
#region TCC
@@ -119,7 +127,42 @@ public IActionResult TransInRevert([FromBody] TransRequest body)
119127
_logger.LogInformation("TransInConfirm, QueryString={0}", Request.QueryString);
120128
_logger.LogInformation("用户: {0},转入 {1} 元---回滚", body.UserId, body.Amount);
121129
return Ok(TransResponse.BuildSucceedResponse());
122-
}
130+
}
131+
#endregion
132+
133+
#region Xa
134+
[HttpPost("XaTransOut")]
135+
public async Task<IActionResult> XaTransOut(CancellationToken token)
136+
{
137+
//todo: Connection pooling needs to be turned off for mysql versions below 8.0
138+
using var conn = new MySqlConnection("Server=en.dtm.pub; Port=3306; User ID=dtm; Password=passwd123dtm; Database=dtm_busi;Pooling=False");
139+
await this._xaTrans.ExcecuteAsync(this.Request.Query, conn, "mysql", async (dbConn, xa) =>
140+
{
141+
var body = await this.Request.ReadFromJsonAsync<TransRequest>();
142+
await dbConn.ExecuteAsync($"UPDATE dtm_busi.user_account SET balance = balance + {body.Amount} where user_id = '{body.UserId}'");
143+
_logger.LogInformation("XaTransOut, QueryString={0}", Request.QueryString);
144+
_logger.LogInformation("用户: {0},转出 {1} 元", body.UserId, body.Amount);
145+
}, token);
146+
147+
return Ok(TransResponse.BuildSucceedResponse());
148+
}
149+
150+
[HttpPost("XaTransIn")]
151+
public async Task<IActionResult> XaTransIn(CancellationToken token)
152+
{
153+
//todo: Connection pooling needs to be turned off for mysql versions below 8.0
154+
using var conn = new MySqlConnection("Server=en.dtm.pub; Port=3306; User ID=dtm; Password=passwd123dtm; Database=dtm_busi;Pooling=False");
155+
await this._xaTrans.ExcecuteAsync(this.Request.Query, conn, "mysql", async (dbConn, xa) =>
156+
{
157+
var body = await this.Request.ReadFromJsonAsync<TransRequest>();
158+
await dbConn.ExecuteAsync($"UPDATE dtm_busi.user_account SET balance = balance + {body.Amount} where user_id = '{body.UserId}'");
159+
_logger.LogInformation("XaTransIn, QueryString={0}", Request.QueryString);
160+
_logger.LogInformation("用户: {0},转入 {1} 元", body.UserId, body.Amount);
161+
}, token);
162+
163+
return Ok(TransResponse.BuildSucceedResponse());
164+
}
123165
#endregion
124166
}
125167
}
168+
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using Dtmcli;
2+
using DtmSample.Dtos;
3+
using Microsoft.AspNetCore.Mvc;
4+
using Microsoft.Extensions.Logging;
5+
using Microsoft.Extensions.Options;
6+
using System;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
10+
namespace DtmSample.Controllers
11+
{
12+
/// <summary>
13+
/// XA 示例
14+
/// </summary>
15+
[ApiController]
16+
[Route("/api")]
17+
public class XaTestController : ControllerBase
18+
{
19+
20+
private readonly ILogger<TccTestController> _logger;
21+
private readonly XaGlobalTransaction _globalTransaction;
22+
private readonly AppSettings _settings;
23+
24+
public XaTestController(ILogger<TccTestController> logger, IOptions<AppSettings> optionsAccs, XaGlobalTransaction transaction)
25+
{
26+
_logger = logger;
27+
_settings = optionsAccs.Value;
28+
_globalTransaction = transaction;
29+
}
30+
31+
/// <summary>
32+
/// Xa 成功提交
33+
/// </summary>
34+
/// <param name="cancellationToken"></param>
35+
/// <returns></returns>
36+
[HttpPost("commit")]
37+
public async Task<IActionResult> Commit(CancellationToken cancellationToken)
38+
{
39+
//todo: Currently only supported by mysql, please modify the appsettings.json
40+
try
41+
{
42+
await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
43+
{
44+
//// 用户1 转出30元
45+
var res1 = await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
46+
47+
//// 用户2 转入30元
48+
var res2 = await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
49+
}, cancellationToken);
50+
51+
return Ok(TransResponse.BuildSucceedResponse());
52+
}
53+
catch (Exception ex)
54+
{
55+
_logger.LogError(ex, "Xa Error");
56+
return Ok(TransResponse.BuildFailureResponse());
57+
}
58+
}
59+
60+
61+
/// <summary>
62+
/// Xa 失败回滚
63+
/// </summary>
64+
/// <param name="cancellationToken"></param>
65+
/// <returns></returns>
66+
[HttpPost("rollbcak")]
67+
public async Task<IActionResult> Rollbcak(CancellationToken cancellationToken)
68+
{
69+
//todo: Currently only supported by mysql, please modify the appsettings.json
70+
try
71+
{
72+
await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
73+
{
74+
//// 用户1 转出30元
75+
var res1 = await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
76+
77+
//// 用户2 转入30元
78+
var res2 = await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
79+
80+
throw new Exception("rollbcak");
81+
}, cancellationToken);
82+
83+
return Ok(TransResponse.BuildSucceedResponse());
84+
}
85+
catch (Exception ex)
86+
{
87+
_logger.LogError(ex, "Xa Error");
88+
return Ok(TransResponse.BuildFailureResponse());
89+
}
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)