Skip to content

Commit 792537b

Browse files
author
ffffwh
committed
debug WaitForAllCommitted
- add log - add test
1 parent c447bcb commit 792537b

File tree

4 files changed

+69
-17
lines changed

4 files changed

+69
-17
lines changed

driver/mysql/applier_gtid_executed.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,9 @@ func (a *GtidExecutedCreater) createTableGtidExecutedV4() error {
246246
}
247247

248248
func (a *ApplierIncr) cleanGtidExecuted(sid uuid.UUID, txSid string) error {
249-
a.logger.Debug("incr. cleanup before WaitForExecution")
250-
if !a.mtsManager.WaitForAllCommitted() {
249+
if !a.mtsManager.WaitForAllCommitted(a.logger.With("txSid", txSid)) {
251250
return nil // shutdown
252251
}
253-
a.logger.Debug("incr. cleanup after WaitForExecution")
254252

255253
intervalStr := func() string {
256254
a.gtidSetLock.RLock()

driver/mysql/applier_incr.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -347,11 +347,9 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
347347
} else {
348348
if binlogEntry.Index == 0 {
349349
if rotated {
350-
a.logger.Debug("binlog rotated. WaitForAllCommitted before", "file", a.replayingBinlogFile)
351-
if !a.mtsManager.WaitForAllCommitted() {
350+
if !a.mtsManager.WaitForAllCommitted(a.logger.With("rotate", a.replayingBinlogFile)) {
352351
return nil // TODO shutdown
353352
}
354-
a.logger.Debug("binlog rotated. WaitForAllCommitted after", "file", a.replayingBinlogFile)
355353
a.mtsManager.lastCommitted = 0
356354
a.mtsManager.lastEnqueue = 0
357355
a.wsManager.resetCommonParent(0)
@@ -376,12 +374,11 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
376374
hasDDL := binlogEntry.HasDDL()
377375
inMiddleDDL := hasDDL || a.prevDDL // DDL must be executed separatedly
378376
if inMiddleDDL || isBig {
379-
a.logger.Info("WaitForAllCommitted",
380-
"gno", txGno, "seq", binlogEntry.Coordinates.GetSequenceNumber(),
381-
"lc", binlogEntry.Coordinates.GetLastCommit(), "leq", a.mtsManager.lastEnqueue,
377+
if !a.mtsManager.WaitForAllCommitted(a.logger.With("gno", txGno,
378+
"seq", binlogEntry.Coordinates.GetSequenceNumber(),
379+
"lc", binlogEntry.Coordinates.GetLastCommit(),
382380
"hasDDL", hasDDL, "prevDDL", a.prevDDL,
383-
"bigtx", isBig, "index", binlogEntry.Index)
384-
if !a.mtsManager.WaitForAllCommitted() {
381+
"bigtx", isBig, "index", binlogEntry.Index)) {
385382
return nil // shutdown
386383
}
387384
}

driver/mysql/applier_incr_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,44 @@
11
package mysql
22

3+
import (
4+
"github.com/hashicorp/go-hclog"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestMtsManager(t *testing.T) {
10+
logger := hclog.Default()
11+
shutdownCh := make(chan struct{})
12+
defer close(shutdownCh)
13+
14+
mm := NewMtsManager(shutdownCh, logger)
15+
go mm.LcUpdater()
16+
17+
doneCh := make(chan struct{})
18+
go func() {
19+
mm.WaitForExecution0(1, 0)
20+
logger.Info("wait 1 0")
21+
mm.Executed0(1)
22+
mm.WaitForExecution0(2, 1)
23+
logger.Info("wait 2 1")
24+
mm.Executed0(2)
25+
mm.WaitForExecution0(3, 2)
26+
logger.Info("wait 3 2")
27+
mm.Executed0(3)
28+
29+
// BinlogEntry was resent
30+
mm.WaitForExecution0(2, 1)
31+
mm.Executed0(2)
32+
33+
mm.WaitForAllCommitted(logger)
34+
close(doneCh)
35+
}()
36+
37+
tm := time.NewTimer(10 * time.Second)
38+
select {
39+
case <-doneCh:
40+
t.Log("case finished")
41+
case <-tm.C:
42+
t.Fatal("might be stuck")
43+
}
44+
}

driver/mysql/applier_mts.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"container/heap"
55
"hash/fnv"
66
"sync/atomic"
7+
"time"
78

89
"github.com/actiontech/dtle/driver/common"
910
"github.com/actiontech/dtle/g"
@@ -56,14 +57,18 @@ func NewMtsManager(shutdownCh chan struct{}, logger g.LoggerType) *MtsManager {
5657
}
5758

5859
// This function must be called sequentially.
59-
func (mm *MtsManager) WaitForAllCommitted() bool {
60-
g.Logger.Debug("WaitForAllCommitted", "lc", mm.lastCommitted, "le", mm.lastEnqueue)
60+
func (mm *MtsManager) WaitForAllCommitted(logger g.LoggerType) bool {
61+
t := time.NewTimer(30 * time.Second)
62+
defer t.Stop()
63+
6164
for {
62-
if mm.lastCommitted == mm.lastEnqueue {
65+
if atomic.LoadInt64(&mm.lastCommitted) == mm.lastEnqueue {
6366
return true
6467
}
6568

6669
select {
70+
case <-t.C: // this will only be triggered once
71+
logger.Warn("WaitForAllCommitted has been stuck for 30s")
6772
case <-mm.shutdownCh:
6873
return false
6974
case <-mm.updated:
@@ -75,15 +80,21 @@ func (mm *MtsManager) WaitForAllCommitted() bool {
7580
// block for waiting. return true for can_execute, false for abortion.
7681
// This function must be called sequentially.
7782
func (mm *MtsManager) WaitForExecution(binlogEntry *common.DataEntry) bool {
78-
mm.lastEnqueue = binlogEntry.Coordinates.GetSequenceNumber()
83+
return mm.WaitForExecution0(
84+
binlogEntry.Coordinates.GetSequenceNumber(),
85+
binlogEntry.Coordinates.(*common.MySQLCoordinateTx).LastCommitted)
86+
}
87+
88+
func (mm *MtsManager) WaitForExecution0(seq int64, lc int64) bool {
89+
mm.lastEnqueue = seq
7990

8091
if mm.forceMts {
8192
return true
8293
}
8394

8495
for {
8596
currentLC := atomic.LoadInt64(&mm.lastCommitted)
86-
if currentLC >= binlogEntry.Coordinates.(*common.MySQLCoordinateTx).LastCommitted {
97+
if currentLC >= lc {
8798
return true
8899
}
89100

@@ -130,10 +141,14 @@ func (mm *MtsManager) LcUpdater() {
130141
}
131142

132143
func (mm *MtsManager) Executed(binlogEntry *common.DataEntry) {
144+
mm.Executed0(binlogEntry.Coordinates.GetSequenceNumber())
145+
}
146+
147+
func (mm *MtsManager) Executed0(seq int64) {
133148
select {
134149
case <-mm.shutdownCh:
135150
return
136-
case mm.chExecuted <- binlogEntry.Coordinates.GetSequenceNumber():
151+
case mm.chExecuted <- seq:
137152
}
138153
}
139154

0 commit comments

Comments
 (0)