Skip to content

Commit c447bcb

Browse files
author
ffffwh
committed
full copy: retry on deadlock #1055
1 parent 6075dea commit c447bcb

File tree

2 files changed

+40
-33
lines changed

2 files changed

+40
-33
lines changed

driver/mysql/applier.go

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"bytes"
1111
gosql "database/sql"
1212
"fmt"
13+
mysqldriver "github.com/go-sql-driver/mysql"
1314
"math"
1415
"strconv"
1516
"strings"
@@ -436,7 +437,7 @@ func (a *Applier) doFullCopy() {
436437
return
437438
case copyRows := <-a.dumpEntryQueue:
438439
//time.Sleep(20 * time.Second) // #348 stub
439-
if err = a.ApplyEventQueries(a.db, copyRows); err != nil {
440+
if err = a.ApplyEventQueries(copyRows); err != nil {
440441
return
441442
}
442443
atomic.AddInt64(a.memory1, -int64(copyRows.Size()))
@@ -839,7 +840,7 @@ func (a *Applier) ValidateGrants() error {
839840
return fmt.Errorf("user has insufficient privileges for applier. Needed:ALTER, CREATE, DROP, INDEX, REFERENCES, INSERT, DELETE, UPDATE, SELECT, TRIGGER ON *.*")
840841
}
841842

842-
func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *common.DumpEntry) (err error) {
843+
func (a *Applier) ApplyEventQueries(entry *common.DumpEntry) (err error) {
843844
a.logger.Debug("ApplyEventQueries", "schema", entry.TableSchema, "table", entry.TableName,
844845
"rows", len(entry.ValuesX))
845846

@@ -893,29 +894,18 @@ func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *common.DumpEntry) (err
893894

894895
queries = append(queries, entry.SqlMode, entry.DbSQL)
895896
queries = append(queries, entry.TbSQL...)
896-
tx, err := db.BeginTx(a.ctx, &gosql.TxOptions{})
897-
if err != nil {
898-
return err
899-
}
897+
898+
conn := a.dbs[0].Db
900899
nRows := int64(len(entry.ValuesX))
901-
defer func() {
902-
if err != nil {
903-
return
904-
}
905-
err = tx.Commit()
906-
if err == nil {
907-
atomic.AddInt64(&a.TotalRowsReplayed, nRows)
908-
}
909-
}()
910-
if _, err := tx.ExecContext(a.ctx, querySetFKChecksOff); err != nil {
900+
if _, err := conn.ExecContext(a.ctx, querySetFKChecksOff); err != nil {
911901
return err
912902
}
913903
execQuery := func(query string) error {
914904
a.logger.Debug("ApplyEventQueries. exec", "query", g.StrLim(query, 256))
915-
_, err := tx.ExecContext(a.ctx, query)
905+
_, err := conn.ExecContext(a.ctx, query)
916906
if err != nil {
917907
queryStart := g.StrLim(query, 10) // avoid printing sensitive information
918-
errCtx := errors.Wrapf(err, "tx.Exec. queryStart %v seq", queryStart)
908+
errCtx := errors.Wrapf(err, "tx.Exec. queryStart %v", queryStart)
919909
if !sql.IgnoreError(err) {
920910
a.logger.Error("ApplyEventQueries. exec error", "err", errCtx)
921911
return errCtx
@@ -972,13 +962,22 @@ func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *common.DumpEntry) (err
972962
// last rows or sql too large
973963

974964
if needInsert {
975-
err := execQuery(buf.String())
976-
buf.Reset()
977-
if err != nil {
978-
return err
965+
for iTry := 0; ; iTry++ {
966+
err := execQuery(buf.String())
967+
if errIsMysqlDeadlock(err) && iTry < a.mysqlContext.RetryTxLimit {
968+
a.logger.Info("found deadlock. will retry tx", "schema", entry.TableSchema, "table", entry.Table,
969+
"iTry", iTry)
970+
time.Sleep(retryTxDelay)
971+
continue
972+
} else if err != nil {
973+
return err
974+
}
975+
break
979976
}
977+
buf.Reset()
980978
}
981979
}
980+
atomic.AddInt64(&a.TotalRowsReplayed, nRows)
982981

983982
return nil
984983
}
@@ -1228,3 +1227,15 @@ func (a *Applier) updateDumpProgressLoop() {
12281227
time.Sleep(time.Duration(interval) * time.Second)
12291228
}
12301229
}
1230+
func errIsMysqlDeadlock(err error) bool {
1231+
if err != nil {
1232+
merr, isME := err.(*mysqldriver.MySQLError)
1233+
if isME {
1234+
return merr.Number ==sql.ErrLockDeadlock
1235+
} else {
1236+
return false
1237+
}
1238+
} else {
1239+
return false
1240+
}
1241+
}

driver/mysql/applier_incr.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
gomysql "github.com/go-mysql-org/go-mysql/mysql"
1919
"github.com/pkg/errors"
2020
uuid "github.com/satori/go.uuid"
21-
mysqldriver "github.com/go-sql-driver/mysql"
2221
)
2322

2423
const (
@@ -236,19 +235,16 @@ func (a *ApplierIncr) MtsWorker(workerIndex int) {
236235
logger.Debug("a binlogEntry MTS dequeue", "gno", entryContext.Entry.Coordinates.GetGNO())
237236
for iTry := 0; ; iTry++ {
238237
err := a.ApplyBinlogEvent(workerIndex, entryContext)
239-
if err != nil {
240-
if merr, isME := err.(*mysqldriver.MySQLError); isME {
241-
if merr.Number == sql.ErrLockDeadlock && iTry < a.mysqlContext.RetryTxLimit {
242-
logger.Info("found deadlock. will retry tx", "gno", entryContext.Entry.Coordinates.GetGNO(),
243-
"iTry", iTry)
244-
time.Sleep(retryTxDelay)
245-
continue
246-
}
247-
}
238+
if errIsMysqlDeadlock(err) && iTry < a.mysqlContext.RetryTxLimit {
239+
logger.Info("found deadlock. will retry tx", "gno", entryContext.Entry.Coordinates.GetGNO(),
240+
"iTry", iTry)
241+
time.Sleep(retryTxDelay)
242+
continue
243+
} else if err != nil {
248244
a.OnError(common.TaskStateDead, err) // TODO coordinate with other goroutine
249245
keepLoop = false
250246
}
251-
break;
247+
break
252248
}
253249
logger.Debug("after ApplyBinlogEvent.", "gno", entryContext.Entry.Coordinates.GetGNO())
254250
case <-t.C:

0 commit comments

Comments
 (0)