Skip to content

Commit 0f0c97b

Browse files
author
ffffwh
committed
fix big tx stuck #1040
1 parent bfa981c commit 0f0c97b

File tree

1 file changed

+47
-47
lines changed

1 file changed

+47
-47
lines changed

driver/mysql/applier_incr.go

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,17 @@ func (a *ApplierIncr) MtsWorker(workerIndex int) {
256256

257257
func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
258258
binlogEntry := entryCtx.Entry
259-
isBig := binlogEntry.IsPartOfBigTx()
260259
txGno := binlogEntry.Coordinates.GetGNO()
260+
a.logger.Debug("a binlogEntry.", "remaining", len(a.incrBytesQueue), "gno", txGno,
261+
"lc", binlogEntry.Coordinates.GetLastCommit(), "seq", binlogEntry.Coordinates.GetSequenceNumber(),
262+
"index", binlogEntry.Index, "final", binlogEntry.Final)
263+
264+
if binlogEntry.Coordinates.GetSid() == uuid.UUID([16]byte{0}) {
265+
return a.handleEntryOracle(entryCtx)
266+
}
267+
268+
isBig := binlogEntry.IsPartOfBigTx()
269+
txSid := binlogEntry.Coordinates.GetSidStr()
261270

262271
if a.inBigTx && binlogEntry.Index == 0 {
263272
a.logger.Info("bigtx: found resent BinlogEntry", "gno", txGno)
@@ -270,64 +279,55 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
270279
a.inBigTx = false
271280
}
272281

273-
if binlogEntry.Coordinates.GetSid() == uuid.UUID([16]byte{0}) {
274-
return a.handleEntryOracle(entryCtx)
275-
}
276-
txSid := binlogEntry.Coordinates.GetSidStr()
277-
278-
a.logger.Debug("a binlogEntry.", "remaining", len(a.incrBytesQueue),
279-
"gno", txGno, "lc", binlogEntry.Coordinates.GetLastCommit(),
280-
"seq", binlogEntry.Coordinates.GetSequenceNumber())
281-
282-
if txSid == a.MySQLServerUuid {
283-
a.logger.Debug("skipping a dtle tx.", "osid", txSid)
284-
a.EntryExecutedHook(binlogEntry) // make gtid continuous
285-
return nil
286-
}
287-
288-
// Note: the gtidExecuted will be updated after commit. For a big-tx, we determine
289-
// whether to skip for each parts.
290-
291282
// region TestIfExecuted
292-
{
293-
if a.fwdExtractor != nil {
294-
if a.fwdExtractor.binlogReader != nil {
295-
if base.GtidSetContains(&a.fwdExtractor.binlogReader.CurrentGtidSetMutex,
296-
a.fwdExtractor.binlogReader.CurrentGtidSet, txSid, txGno) {
297-
a.logger.Info("skip an fwd executed tx", "sid", txSid, "gno", txGno)
298-
a.EntryExecutedHook(binlogEntry) // make gtid continuous
299-
return nil
300-
}
283+
skipEntry := false
284+
if txSid == a.MySQLServerUuid {
285+
a.logger.Debug("skipping a binlogEntry with the same sid as target.", "sid", txSid)
286+
skipEntry = true
287+
} else if a.fwdExtractor != nil {
288+
if a.fwdExtractor.binlogReader != nil {
289+
if base.GtidSetContains(&a.fwdExtractor.binlogReader.CurrentGtidSetMutex,
290+
a.fwdExtractor.binlogReader.CurrentGtidSet, txSid, txGno) {
291+
skipEntry = true
292+
a.logger.Info("skip an fwd executed tx", "sid", txSid, "gno", txGno)
301293
}
302294
}
295+
} else if base.GtidSetContains(a.gtidSetLock, a.gtidSet, txSid, txGno) {
296+
a.logger.Info("skip an executed tx", "sid", txSid, "gno", txGno)
297+
skipEntry = true
298+
}
303299

304-
if base.GtidSetContains(a.gtidSetLock, a.gtidSet, txSid, txGno) {
305-
a.logger.Info("skip an executed tx", "sid", txSid, "gno", txGno)
306-
return nil
300+
// Note: the gtidExecuted will be updated after commit.
301+
// For a big-tx, we determine whether to skip for each parts.
302+
if skipEntry {
303+
if binlogEntry.Final {
304+
a.EntryExecutedHook(binlogEntry) // make gtid continuous
307305
}
306+
307+
return nil
308308
}
309309
// endregion
310310

311-
// this must be after duplication check
312-
var rotated bool
313-
if a.replayingBinlogFile == binlogEntry.Coordinates.GetLogFile() {
314-
rotated = false
315-
} else {
316-
rotated = true
317-
a.replayingBinlogFile = binlogEntry.Coordinates.GetLogFile()
318-
}
311+
rotated := false
312+
if binlogEntry.Index == 0 {
313+
// this must be after duplication check
314+
if a.replayingBinlogFile != binlogEntry.Coordinates.GetLogFile() {
315+
rotated = true
316+
a.replayingBinlogFile = binlogEntry.Coordinates.GetLogFile()
317+
}
319318

320-
gtidSetItem := a.gtidItemMap.GetItem(binlogEntry.Coordinates.GetSid().(uuid.UUID))
321-
a.logger.Debug("gtidSetItem", "NRow", gtidSetItem.NRow)
322-
if gtidSetItem.NRow >= cleanupGtidExecutedLimit {
323-
err = a.cleanGtidExecuted(binlogEntry.Coordinates.GetSid().(uuid.UUID), txSid)
324-
if err != nil {
325-
return err
319+
gtidSetItem := a.gtidItemMap.GetItem(binlogEntry.Coordinates.GetSid().(uuid.UUID))
320+
a.logger.Debug("gtidSetItem", "NRow", gtidSetItem.NRow)
321+
if gtidSetItem.NRow >= cleanupGtidExecutedLimit {
322+
err = a.cleanGtidExecuted(binlogEntry.Coordinates.GetSid().(uuid.UUID), txSid)
323+
if err != nil {
324+
return err
325+
}
326+
gtidSetItem.NRow = 1
326327
}
327-
gtidSetItem.NRow = 1
328+
gtidSetItem.NRow += 1
328329
}
329330

330-
gtidSetItem.NRow += 1
331331
if binlogEntry.Coordinates.GetSequenceNumber() == 0 {
332332
// MySQL 5.6: non mts
333333
if isBig {

0 commit comments

Comments
 (0)