Skip to content
190 changes: 70 additions & 120 deletions cluster/cluster_fail.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ func (cluster *Cluster) MasterFailover(fail bool) bool {
cluster.LogSQL(logs, err, cluster.master.URL, "MasterFailover", config.LvlInfo, "Flush Log on new Master %d", ctbinlog)
}
time.Sleep(2 * time.Second)
ms, logs, err := dbhelper.GetMasterStatus(cluster.master.Conn, cluster.master.DBVersion)
cluster.master.FailoverMasterLogFile = ms.File
sms, logs, err := dbhelper.GetMasterStatus(cluster.master.Conn, cluster.master.DBVersion)
cluster.master.FailoverMasterLogFile = sms.File
cluster.master.FailoverMasterLogPos = "4"
crash.FailoverMasterLogFile = ms.File
crash.FailoverMasterLogFile = sms.File
crash.FailoverMasterLogPos = "4"
cluster.LogSQL(logs, err, cluster.master.URL, "MasterFailover", config.LvlInfo, "Backing up master pos %s %s", crash.FailoverMasterLogFile, crash.FailoverMasterLogPos)

Expand Down Expand Up @@ -312,6 +312,7 @@ func (cluster *Cluster) MasterFailover(fail bool) bool {
changemasteropt.Logpos = cluster.master.BinaryLogPos
changemasteropt.Retry = strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry)
changemasteropt.Heartbeat = strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime)
changemasteropt.RetryCount = strconv.Itoa(cluster.Conf.MasterRetryCount)
changemasteropt.SSL = cluster.Conf.ReplicationSSL
changemasteropt.Channel = cluster.Conf.MasterConn
changemasteropt.IsDelayed = cluster.oldMaster.IsDelayed
Expand Down Expand Up @@ -418,6 +419,45 @@ func (cluster *Cluster) MasterFailover(fail bool) bool {
return true
}

func (cluster *Cluster) pointSlaveToMasterWithMode(sl *ServerMonitor, mode string) (string, error) {
changemasteropt := cluster.GetChangeMasterBaseOptForSlave(sl, cluster.master, sl.IsDelayed)
changemasteropt.Mode = mode

return dbhelper.ChangeMaster(sl.Conn, changemasteropt, sl.DBVersion)
}

func (cluster *Cluster) pointSlaveToMasterPositional(sl *ServerMonitor) (string, error) {
changemasteropt := cluster.GetChangeMasterBaseOptForSlave(sl, cluster.master, sl.IsDelayed)

pseudoGTID, logs, err := sl.GetLastPseudoGTID()
cluster.LogSQL(logs, err, sl.URL, "MasterFailover", config.LvlErr, "Could not get pseudoGTID on slave %s, %s", sl.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found pseudoGTID %s", pseudoGTID)
slFile, slPos, logs, err := sl.GetBinlogPosFromPseudoGTID(pseudoGTID)
cluster.LogSQL(logs, err, sl.URL, "MasterFailover", config.LvlErr, "Could not find pseudoGTID in slave %s, %s", sl.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found Coordinates on slave %s, %s", slFile, slPos)
slSkip, logs, err := sl.GetNumberOfEventsAfterPos(slFile, slPos)
cluster.LogSQL(logs, err, sl.URL, "MasterFailover", config.LvlErr, "Could not find number of events after pseudoGTID in slave %s, %s", sl.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found %d events to skip after coordinates on slave %s,%s", slSkip, slFile, slPos)

mFile, mPos, logs, err := cluster.master.GetBinlogPosFromPseudoGTID(pseudoGTID)
cluster.LogSQL(logs, err, cluster.master.URL, "MasterFailover", config.LvlErr, "Could not find pseudoGTID in master %s, %s", cluster.master.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found coordinate on master %s ,%s", mFile, mPos)
mFile, mPos, logs, err = cluster.master.GetBinlogPosAfterSkipNumberOfEvents(mFile, mPos, slSkip)
cluster.LogSQL(logs, err, cluster.master.URL, "MasterFailover", config.LvlErr, "Could not skip event after pseudoGTID in master %s, %s", cluster.master.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found skip coordinate on master %s, %s", mFile, mPos)

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Doing Positional switch of slave %s", sl.URL)
changemasteropt.Logfile = mFile
changemasteropt.Logpos = mPos
changemasteropt.Mode = "POSITIONAL"

return dbhelper.ChangeMaster(sl.Conn, changemasteropt, sl.DBVersion)
}

func (cluster *Cluster) pointSlaveToMasterMxsNoGtid(sl *ServerMonitor) (string, error) {
return dbhelper.ChangeMaster(sl.Conn, cluster.GetChangeMasterBaseOptForMxs(sl, cluster.master), sl.DBVersion)
}

func (cluster *Cluster) SwitchSlavesToMaster(fail bool) {
var err error
var logs string
Expand Down Expand Up @@ -446,112 +486,26 @@ func (cluster *Cluster) SwitchSlavesToMaster(fail bool) {

var changeMasterErr error

var changemasteropt dbhelper.ChangeMasterOpt
changemasteropt.Host = cluster.master.Host
changemasteropt.Port = cluster.master.Port
changemasteropt.User = cluster.GetRplUser()
changemasteropt.Password = cluster.GetRplPass()
changemasteropt.Retry = strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry)
changemasteropt.Heartbeat = strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime)
changemasteropt.SSL = cluster.Conf.ReplicationSSL
changemasteropt.Channel = cluster.Conf.MasterConn
changemasteropt.IsDelayed = sl.IsDelayed
changemasteropt.Delay = strconv.Itoa(sl.ClusterGroup.Conf.HostsDelayedTime)
changemasteropt.PostgressDB = cluster.master.PostgressDB

// Not MariaDB and not using MySQL GTID, 2.0 stop doing any thing until pseudo GTID
if sl.HasMariaDBGTID() == false && cluster.master.HasMySQLGTID() == false {

if cluster.Conf.AutorejoinSlavePositionalHeartbeat == true {

pseudoGTID, logs, err := sl.GetLastPseudoGTID()
cluster.LogSQL(logs, err, sl.URL, "MasterFailover", config.LvlErr, "Could not get pseudoGTID on slave %s, %s", sl.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found pseudoGTID %s", pseudoGTID)
slFile, slPos, logs, err := sl.GetBinlogPosFromPseudoGTID(pseudoGTID)
cluster.LogSQL(logs, err, sl.URL, "MasterFailover", config.LvlErr, "Could not find pseudoGTID in slave %s, %s", sl.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found Coordinates on slave %s, %s", slFile, slPos)
slSkip, logs, err := sl.GetNumberOfEventsAfterPos(slFile, slPos)
cluster.LogSQL(logs, err, sl.URL, "MasterFailover", config.LvlErr, "Could not find number of events after pseudoGTID in slave %s, %s", sl.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found %d events to skip after coordinates on slave %s,%s", slSkip, slFile, slPos)

mFile, mPos, logs, err := cluster.master.GetBinlogPosFromPseudoGTID(pseudoGTID)
cluster.LogSQL(logs, err, cluster.master.URL, "MasterFailover", config.LvlErr, "Could not find pseudoGTID in master %s, %s", cluster.master.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found coordinate on master %s ,%s", mFile, mPos)
mFile, mPos, logs, err = cluster.master.GetBinlogPosAfterSkipNumberOfEvents(mFile, mPos, slSkip)
cluster.LogSQL(logs, err, cluster.master.URL, "MasterFailover", config.LvlErr, "Could not skip event after pseudoGTID in master %s, %s", cluster.master.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Found skip coordinate on master %s, %s", mFile, mPos)

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Doing Positional switch of slave %s", sl.URL)
changemasteropt.Logfile = mFile
changemasteropt.Logpos = mPos
changemasteropt.Mode = "POSITIONAL"
logs, changeMasterErr = dbhelper.ChangeMaster(sl.Conn, changemasteropt, sl.DBVersion)
logs, changeMasterErr = cluster.pointSlaveToMasterPositional(sl)
} else {
sl.SetMaintenance()
}
// do nothing stay connected to dead master proceed with relay fix later

} else if cluster.oldMaster.DBVersion.IsMySQLOrPerconaGreater57() && cluster.master.HasMySQLGTID() == true {
logs, changeMasterErr = dbhelper.ChangeMaster(sl.Conn, dbhelper.ChangeMasterOpt{
Host: cluster.master.Host,
Port: cluster.master.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
Mode: "MASTER_AUTO_POSITION",
SSL: cluster.Conf.ReplicationSSL,
Channel: cluster.Conf.MasterConn,
IsDelayed: sl.IsDelayed,
Delay: strconv.Itoa(sl.ClusterGroup.Conf.HostsDelayedTime),
PostgressDB: cluster.master.PostgressDB,
}, sl.DBVersion)
logs, changeMasterErr = cluster.pointSlaveToMasterWithMode(sl, "MASTER_AUTO_POSITION")
} else if cluster.Conf.MxsBinlogOn == false {
//MariaDB all cases use GTID

logs, changeMasterErr = dbhelper.ChangeMaster(sl.Conn, dbhelper.ChangeMasterOpt{
Host: cluster.master.Host,
Port: cluster.master.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
Mode: "SLAVE_POS",
SSL: cluster.Conf.ReplicationSSL,
Channel: cluster.Conf.MasterConn,
IsDelayed: sl.IsDelayed,
Delay: strconv.Itoa(sl.ClusterGroup.Conf.HostsDelayedTime),
PostgressDB: cluster.master.PostgressDB,
}, sl.DBVersion)
logs, changeMasterErr = cluster.pointSlaveToMasterWithMode(sl, "SLAVE_POS")
} else { // We deduct we are in maxscale binlog server , but can have support for GTID or not

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Pointing relay to the new master: %s:%s", cluster.master.Host, cluster.master.Port)
if sl.MxsHaveGtid {
logs, changeMasterErr = dbhelper.ChangeMaster(sl.Conn, dbhelper.ChangeMasterOpt{
Host: cluster.master.Host,
Port: cluster.master.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
Mode: "SLAVE_POS",
SSL: cluster.Conf.ReplicationSSL,
Channel: cluster.Conf.MasterConn,
IsDelayed: sl.IsDelayed,
Delay: strconv.Itoa(sl.ClusterGroup.Conf.HostsDelayedTime),
PostgressDB: cluster.master.PostgressDB,
}, sl.DBVersion)
logs, changeMasterErr = cluster.pointSlaveToMasterWithMode(sl, "SLAVE_POS")
} else {
logs, changeMasterErr = dbhelper.ChangeMaster(sl.Conn, dbhelper.ChangeMasterOpt{
Host: cluster.master.Host,
Port: cluster.master.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
Mode: "MXS",
SSL: cluster.Conf.ReplicationSSL,
}, sl.DBVersion)
logs, changeMasterErr = cluster.pointSlaveToMasterMxsNoGtid(sl)
}
}
cluster.LogSQL(logs, changeMasterErr, sl.URL, "MasterFailover", config.LvlErr, "Change master failed on slave %s, %s", sl.URL, changeMasterErr)
Expand Down Expand Up @@ -623,6 +577,7 @@ func (cluster *Cluster) FailoverExtraMultiSource(oldMaster *ServerMonitor, NewMa
changemasteropt.Password = myparentrplpassword
changemasteropt.Retry = strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry)
changemasteropt.Heartbeat = strconv.Itoa(int(rep.SlaveHeartbeatPeriod))
changemasteropt.RetryCount = strconv.Itoa(cluster.Conf.MasterRetryCount)
changemasteropt.Logfile = rep.MasterLogFile.String
changemasteropt.Logpos = rep.ExecMasterLogPos.String
changemasteropt.SSL = cluster.Conf.ReplicationSSL
Expand Down Expand Up @@ -1481,36 +1436,31 @@ func (cluster *Cluster) CloseRing(oldMaster *ServerMonitor) error {
hasMyGTID := parent.HasMySQLGTID()

var changeMasterErr error
changeMasterOpt := dbhelper.ChangeMasterOpt{
Host: parent.Host,
Port: parent.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
RetryCount: strconv.Itoa(cluster.Conf.MasterRetryCount),
SSL: cluster.Conf.ReplicationSSL,
Channel: cluster.Conf.MasterConn,
PostgressDB: parent.PostgressDB,
}

// Not MariaDB and not using MySQL GTID, 2.0 stop doing any thing until pseudo GTID
if parent.DBVersion.IsMySQLOrPerconaGreater57() && hasMyGTID == true {
logs, changeMasterErr = dbhelper.ChangeMaster(child.Conn, dbhelper.ChangeMasterOpt{
Host: parent.Host,
Port: parent.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
Mode: "",
SSL: cluster.Conf.ReplicationSSL,
Channel: cluster.Conf.MasterConn,
PostgressDB: parent.PostgressDB,
}, child.DBVersion)
if parent.DBVersion.IsMySQLOrPerconaGreater57() {
if hasMyGTID {
changeMasterOpt.Mode = "MASTER_AUTO_POSITION"
} else {
changeMasterOpt.Mode = ""
}
logs, changeMasterErr = dbhelper.ChangeMaster(child.Conn, changeMasterOpt, child.DBVersion)
} else {
//MariaDB all cases use GTID

logs, changeMasterErr = dbhelper.ChangeMaster(child.Conn, dbhelper.ChangeMasterOpt{
Host: parent.Host,
Port: parent.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
Mode: "SLAVE_POS",
SSL: cluster.Conf.ReplicationSSL,
Channel: cluster.Conf.MasterConn,
PostgressDB: parent.PostgressDB,
}, child.DBVersion)
changeMasterOpt.Mode = "SLAVE_POS"
logs, changeMasterErr = dbhelper.ChangeMaster(child.Conn, changeMasterOpt, child.DBVersion)
}

cluster.LogSQL(logs, changeMasterErr, child.URL, "MasterFailover", config.LvlErr, "Could not change masteron server %s, %s", child.URL, changeMasterErr)
Expand Down
53 changes: 53 additions & 0 deletions cluster/cluster_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,3 +1770,56 @@ func (cluster *Cluster) GetDeprecatedKeyMap() map[string]bool {
}
return keys
}

func (cluster *Cluster) GetChangeMasterBaseOptForSlave(sl *ServerMonitor, master *ServerMonitor, delayed bool) dbhelper.ChangeMasterOpt {
opt := dbhelper.ChangeMasterOpt{
Host: master.Host,
Port: master.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
RetryCount: strconv.Itoa(cluster.Conf.MasterRetryCount),
SSL: cluster.Conf.ReplicationSSL,
Channel: cluster.Conf.MasterConn,
IsDelayed: delayed,
PostgressDB: master.PostgressDB,
}

if delayed {
opt.Delay = strconv.Itoa(cluster.Conf.HostsDelayedTime)
}

return opt
}

func (cluster *Cluster) GetChangeMasterBaseOptForReplGroup(sl *ServerMonitor) dbhelper.ChangeMasterOpt {
opt := dbhelper.ChangeMasterOpt{
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
RetryCount: strconv.Itoa(cluster.Conf.MasterRetryCount),
Mode: "GROUP_REPL",
Channel: "group_replication_recovery",
IsDelayed: sl.IsDelayed,
Delay: strconv.Itoa(cluster.Conf.HostsDelayedTime),
SSL: cluster.Conf.ReplicationSSL,
PostgressDB: sl.PostgressDB,
}

return opt
}

func (cluster *Cluster) GetChangeMasterBaseOptForMxs(sl *ServerMonitor, master *ServerMonitor) dbhelper.ChangeMasterOpt {
return dbhelper.ChangeMasterOpt{
Host: master.Host,
Port: master.Port,
User: cluster.GetRplUser(),
Password: cluster.GetRplPass(),
Retry: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime),
Mode: "MXS",
SSL: cluster.Conf.ReplicationSSL,
}
}
43 changes: 14 additions & 29 deletions cluster/cluster_staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -467,17 +466,8 @@ func (cluster *Cluster) ReseedFromParentCluster(parent *Cluster, target *ServerM
cluster.LogSQL(logs, err, target.URL, "Rejoin", config.LvlErr, "Failed stop slave on server: %s %s", target.URL, err)
}

changeOpt := dbhelper.ChangeMasterOpt{
Host: pmaster.Host,
Port: pmaster.Port,
User: parent.GetRplUser(),
Password: parent.GetRplPass(),
Retry: strconv.Itoa(parent.Conf.ForceSlaveHeartbeatRetry),
Heartbeat: strconv.Itoa(parent.Conf.ForceSlaveHeartbeatTime),
Mode: "SLAVE_POS",
SSL: parent.Conf.ReplicationSSL,
Channel: parent.Conf.MasterConn,
}
changeOpt := parent.GetChangeMasterBaseOptForSlave(target, pmaster, false)
changeOpt.Mode = "SLAVE_POS"

if target.DBVersion.IsMySQLOrPercona() {
if target.HasMySQLGTID() {
Expand Down Expand Up @@ -530,28 +520,23 @@ func (cluster *Cluster) ReseedFromParentCluster(parent *Cluster, target *ServerM
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlWarn, "Task only updated in runtime. Error while writing to jobs table: %s", e2.Error())
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Reseed logical backup %s from parent cluster failed on %s", backtype, target.URL)
return "", err
}

} else {
if e2 := target.JobsUpdateState(task, "Reseed completed", 3, 1); e2 != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlWarn, "Task only updated in runtime. Error while writing to jobs table: %s", e2.Error())
}
if e2 := target.JobsUpdateState(task, "Reseed completed", 3, 1); e2 != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlWarn, "Task only updated in runtime. Error while writing to jobs table: %s", e2.Error())
}

if target.IsMaster() {
_, err2 := target.StartSlaveChannel(parent.Conf.MasterConn)
if err2 != nil {
cluster.LogSQL(logs, err, target.URL, "Rejoin", config.LvlErr, "Failed start slave on server: %s %s", target.URL, err)
} else {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Start slave on %s", target.URL)
}
if target.IsMaster() {
_, err2 := target.StartSlaveChannel(parent.Conf.MasterConn)
if err2 != nil {
cluster.LogSQL(logs, err, target.URL, "Rejoin", config.LvlErr, "Failed start slave on server: %s %s", target.URL, err)
} else {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Start slave on %s", target.URL)
}

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Reseed logical backup %s from parent cluster completed on %s", backtype, target.URL)

}

if err != nil {
return "", err
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Reseed logical backup %s from parent cluster completed on %s", backtype, target.URL)

return masterCurrentGTID, nil
}
Loading