Skip to content

Commit 6075dea

Browse files
author
ffffwh
committed
swap ColumnMapFrom, ColumnMapTo for a reversed job #1053
1 parent 87c9565 commit 6075dea

File tree

2 files changed

+36
-13
lines changed

2 files changed

+36
-13
lines changed

api/handler/v2/job.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,16 @@ import (
1111
"strings"
1212
"time"
1313

14-
mysql "github.com/actiontech/dtle/driver/mysql"
15-
14+
nomadApi "github.com/hashicorp/nomad/api"
1615
"github.com/hashicorp/nomad/nomad/structs"
16+
"github.com/labstack/echo/v4"
1717

18-
"github.com/actiontech/dtle/driver/kafka"
19-
18+
"github.com/actiontech/dtle/api/handler"
19+
"github.com/actiontech/dtle/api/models"
2020
"github.com/actiontech/dtle/driver/common"
21-
21+
"github.com/actiontech/dtle/driver/kafka"
22+
mysql "github.com/actiontech/dtle/driver/mysql"
2223
"github.com/actiontech/dtle/g"
23-
24-
"github.com/actiontech/dtle/api/models"
25-
26-
"github.com/actiontech/dtle/api/handler"
27-
nomadApi "github.com/hashicorp/nomad/api"
28-
"github.com/labstack/echo/v4"
2924
)
3025

3126
// @Id MigrationJobListV2
@@ -1955,6 +1950,11 @@ func ReverseJobV2(c echo.Context, filterJobType DtleJobType) error {
19551950
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(err))
19561951
}
19571952

1953+
if originalJob.BasicTaskProfile.Configuration.SrcConfig.MysqlSrcTaskConfig.TwoWaySync {
1954+
return c.JSON(http.StatusConflict, models.BuildBaseResp(fmt.Errorf(
1955+
"job_id=%v; job can't be reversed with TwoWaySync = true", reqParam.JobId)))
1956+
}
1957+
19581958
reverseJobParam := new(models.CreateOrUpdateMysqlToMysqlJobParamV2)
19591959
reverseJobParam.JobId = fmt.Sprintf("%s-%s", "reverse", consulJobItem.JobId)
19601960
reverseJobParam.TaskStepName = mysql.JobIncrCopy
@@ -1982,6 +1982,15 @@ func ReverseJobV2(c echo.Context, filterJobType DtleJobType) error {
19821982
TwoWaySyncGtid: originalJob.BasicTaskProfile.Configuration.SrcConfig.MysqlSrcTaskConfig.TwoWaySyncGtid,
19831983
},
19841984
}
1985+
for _, dbItem := range reverseJobParam.SrcTask.ReplicateDoDb {
1986+
for _, tbItem := range dbItem.Tables {
1987+
if len(tbItem.ColumnMapFrom) > 0 && len(tbItem.ColumnMapTo) == 0 {
1988+
return c.JSON(http.StatusConflict, models.BuildBaseResp(fmt.Errorf(
1989+
"job_id=%v; job can't be reversed with ColumnMapFrom not matching ColumnMapTo", reqParam.JobId)))
1990+
}
1991+
tbItem.ColumnMapFrom, tbItem.ColumnMapTo = tbItem.ColumnMapTo, tbItem.ColumnMapFrom
1992+
}
1993+
}
19851994
reverseJobParam.DestTask = &models.DestTaskConfig{
19861995
TaskName: common.TaskTypeDest,
19871996
ConnectionConfig: &originalJob.BasicTaskProfile.ConnectionInfo.SrcDataBase,

driver/mysql/applier.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,23 @@ func (a *Applier) Run() {
304304
Subject: a.subject + "_dtrev",
305305
StateDir: a.stateDir,
306306
}
307-
var cfg2 = *a.mysqlContext
307+
// cfg2 will be a deepcopy of a.mysqlContext
308+
cfg2, err := a.storeManager.GetConfig(a.subject)
309+
if err != nil {
310+
a.onError(common.TaskStateDead, errors.Wrap(err, "GetConfig"))
311+
return
312+
}
308313
cfg2.SrcConnectionConfig = a.mysqlContext.DestConnectionConfig
309314
cfg2.DestConnectionConfig = a.mysqlContext.SrcConnectionConfig
315+
for _, dbItem := range cfg2.ReplicateDoDb {
316+
for _, tbItem := range dbItem.Tables {
317+
if len(tbItem.ColumnMapFrom) > 0 && len(tbItem.ColumnMapTo) == 0 {
318+
a.onError(common.TaskStateDead, errors.Wrap(err, "GetConfig"))
319+
return
320+
}
321+
tbItem.ColumnMapFrom, tbItem.ColumnMapTo = tbItem.ColumnMapTo, tbItem.ColumnMapFrom
322+
}
323+
}
310324

311325
if strings.ToLower(a.mysqlContext.TwoWaySyncGtid) == "auto" {
312326
cfg2.AutoGtid = true
@@ -318,7 +332,7 @@ func (a *Applier) Run() {
318332
cfg2.TwoWaySync = false
319333
cfg2.TwoWaySyncGtid = ""
320334

321-
a.revExtractor, err = NewExtractor(execCtx2, &cfg2, a.logger, a.storeManager, a.waitCh, a.ctx)
335+
a.revExtractor, err = NewExtractor(execCtx2, cfg2, a.logger, a.storeManager, a.waitCh, a.ctx)
322336
if err != nil {
323337
a.onError(common.TaskStateDead, errors.Wrap(err, "reversed Extractor"))
324338
return

0 commit comments

Comments
 (0)