99
1010 "github.com/actiontech/dtle/api/handler"
1111 "github.com/actiontech/dtle/api/models"
12- mysql "github.com/actiontech/dtle/driver"
13- nomadApi "github.com/hashicorp/nomad/api"
1412 "github.com/labstack/echo/v4"
1513)
1614
@@ -31,143 +29,45 @@ func GetTaskProgressV2(c echo.Context) error {
3129 return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (err ))
3230 }
3331
34- targetNomadAddr := reqParam .NomadHttpAddress
35- if targetNomadAddr == "" {
36- // find out the node that the task is running
37- logger .Info ("find out the node that the task is running" )
38- url := handler .BuildUrl ("/v1/allocations" )
39- logger .Info ("invoke nomad api begin" , "url" , url )
40- nomadAllocs := []nomadApi.Allocation {}
41- if err := handler .InvokeApiWithKvData (http .MethodGet , url , nil , & nomadAllocs ); nil != err {
42- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("invoke nomad api %v failed: %v" , url , err )))
43- }
44- logger .Info ("invoke nomad api finished" )
45- nodeId := ""
46- for _ , alloc := range nomadAllocs {
47- if alloc .ID == reqParam .AllocationId {
48- nodeId = alloc .NodeID
49- break
50- }
51- }
52- if nodeId == "" {
53- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("cannot find out which node the allocation is running on" )))
54- }
55- url = handler .BuildUrl (fmt .Sprintf ("/v1/node/%v" , nodeId ))
56- logger .Info ("invoke nomad api begin" , "url" , url )
57- nomadNode := nomadApi.Node {}
58- if err := handler .InvokeApiWithKvData (http .MethodGet , url , nil , & nomadNode ); nil != err {
59- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("invoke nomad api %v failed: %v" , url , err )))
60- }
61- logger .Info ("invoke nomad api finished" )
62- targetNomadAddr = nomadNode .HTTPAddr
63- }
32+ logger .Warn ("/v2/monitor/task is unimplemented, returning dummy data" , "AllocationId" , reqParam .AllocationId )
6433
65- targetHost , _ , err := net .SplitHostPort (targetNomadAddr )
66- if err != nil {
67- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("get target host failed: %v" , err )))
68- }
69- logger .Info ("got target host" , "targetHost" , targetHost )
70- selfNomadHost , _ , err := net .SplitHostPort (handler .ApiAddr )
71- if err != nil {
72- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("get self nomad host failed: %v" , err )))
73- }
34+ //storeManager, err := common.NewStoreManager([]string{handler.ConsulAddr}, logger)
35+ //if err != nil {
36+ // return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("consul_addr=%v; connect to consul: failed: %v", handler.ConsulAddr, err)))
37+ //}
38+
39+ //storeManager.GetGtidForJob(reqParam.)
7440
7541 res := models.GetTaskProgressRespV2 {
7642 BaseResp : models .BuildBaseResp (nil ),
7743 }
78- if targetHost != selfNomadHost {
79- logger .Info ("forwarding..." , "targetHost" , targetHost )
80- // forward
81- // invoke http://%v/v1/agent/self to get api_addr
82- url := fmt .Sprintf ("http://%v/v1/agent/self" , targetNomadAddr )
83- nomadAgentSelf := nomadApi.AgentSelf {}
84- if err := handler .InvokeApiWithKvData (http .MethodGet , url , nil , & nomadAgentSelf ); err != nil {
85- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("invoke nomad api %v failed: %v" , url , err )))
86- }
87-
88- _ , targetPort , err := getApiAddrFromAgentConfig (nomadAgentSelf .Config )
89- if err != nil {
90- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("get target host failed: %v" , err )))
91- }
92- forwardAddr := fmt .Sprintf ("%s:%s" , targetHost , targetPort )
93- logger .Info ("forwarding..." , "forwardAddr" , forwardAddr )
94-
95- url = fmt .Sprintf ("http://%v/v2/monitor/task" , forwardAddr )
96- args := map [string ]string {
97- "allocation_id" : reqParam .AllocationId ,
98- "task_name" : reqParam .TaskName ,
99- "nomad_address" : targetNomadAddr ,
100- }
101- logger .Info ("forwarding... invoke target dtle api begin" , "url" , url )
102- if err := handler .InvokeApiWithKvData (http .MethodGet , url , args , & res , c .Request ().Header ); nil != err {
103- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("forward api %v failed: %v" , url , err )))
104- }
105- logger .Info ("forwarding... invoke target dtle api finished" )
106- } else {
107- taskStatus , ok , err := mysql .AllocIdTaskNameToTaskHandler .GetTaskStatistics (reqParam .AllocationId , reqParam .TaskName )
108- if nil != err {
109- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("get task stats failed: %v. allocation_id=%v task_name=%v" , err , reqParam .AllocationId , reqParam .TaskName )))
110- } else if ! ok {
111- return c .JSON (http .StatusInternalServerError , models .BuildBaseResp (fmt .Errorf ("cannot find the task. allocation_id=%v task_name=%v" , reqParam .AllocationId , reqParam .TaskName )))
112- }
113-
114- // build response struct
115- var currentCoordinates * models.CurrentCoordinates
116- var delayCount * models.DelayCount
117- var throughputStat * models.ThroughputStat
118- if taskStatus .CurrentCoordinates != nil {
119- currentCoordinates = & models.CurrentCoordinates {
120- File : taskStatus .CurrentCoordinates .File ,
121- Position : taskStatus .CurrentCoordinates .Position ,
122- GtidSet : taskStatus .CurrentCoordinates .GtidSet ,
123- RelayMasterLogFile : taskStatus .CurrentCoordinates .RelayMasterLogFile ,
124- ReadMasterLogPos : taskStatus .CurrentCoordinates .ReadMasterLogPos ,
125- RetrievedGtidSet : taskStatus .CurrentCoordinates .RetrievedGtidSet ,
126- }
127- }
128-
129- if taskStatus .DelayCount != nil {
130- delayCount = & models.DelayCount {
131- Num : taskStatus .DelayCount .Num ,
132- Time : taskStatus .DelayCount .Time ,
133- }
134- }
135-
136- if taskStatus .ThroughputStat != nil {
137- throughputStat = & models.ThroughputStat {
138- Num : taskStatus .ThroughputStat .Num ,
139- Time : taskStatus .ThroughputStat .Time ,
140- }
141- }
142-
143- res .TaskStatus = & models.TaskProgress {
144- CurrentCoordinates : currentCoordinates ,
145- DelayCount : delayCount ,
146- ProgressPct : taskStatus .ProgressPct ,
147- ExecMasterRowCount : taskStatus .ExecMasterRowCount ,
148- ExecMasterTxCount : taskStatus .ExecMasterTxCount ,
149- ReadMasterRowCount : taskStatus .ReadMasterRowCount ,
150- ReadMasterTxCount : taskStatus .ReadMasterTxCount ,
151- ETA : taskStatus .ETA ,
152- Backlog : taskStatus .Backlog ,
153- ThroughputStat : throughputStat ,
154- NatsMsgStat : & models.NatsMessageStatistics {
155- InMsgs : taskStatus .MsgStat .InMsgs ,
156- OutMsgs : taskStatus .MsgStat .OutMsgs ,
157- InBytes : taskStatus .MsgStat .InBytes ,
158- OutBytes : taskStatus .MsgStat .OutBytes ,
159- Reconnects : taskStatus .MsgStat .Reconnects ,
160- },
161- BufferStat : & models.BufferStat {
162- BinlogEventQueueSize : taskStatus .BufferStat .BinlogEventQueueSize ,
163- ExtractorTxQueueSize : taskStatus .BufferStat .ExtractorTxQueueSize ,
164- ApplierTxQueueSize : taskStatus .BufferStat .ApplierTxQueueSize ,
165- SendByTimeout : taskStatus .BufferStat .SendByTimeout ,
166- SendBySizeFull : taskStatus .BufferStat .SendBySizeFull ,
167- },
168- Stage : taskStatus .Stage ,
169- Timestamp : taskStatus .Timestamp ,
170- }
44+ res .TaskStatus = & models.TaskProgress {
45+ CurrentCoordinates : & models.CurrentCoordinates {},
46+ DelayCount : & models.DelayCount {},
47+ //ProgressPct: taskStatus.ProgressPct,
48+ ExecMasterRowCount : 0 ,
49+ ExecMasterTxCount : 0 ,
50+ ReadMasterRowCount : 0 ,
51+ ReadMasterTxCount : 0 ,
52+ //ETA: taskStatus.ETA,
53+ //Backlog: taskStatus.Backlog,
54+ //ThroughputStat: throughputStat,
55+ //NatsMsgStat: &models.NatsMessageStatistics{
56+ // InMsgs: taskStatus.MsgStat.InMsgs,
57+ // OutMsgs: taskStatus.MsgStat.OutMsgs,
58+ // InBytes: taskStatus.MsgStat.InBytes,
59+ // OutBytes: taskStatus.MsgStat.OutBytes,
60+ // Reconnects: taskStatus.MsgStat.Reconnects,
61+ //},
62+ //BufferStat: &models.BufferStat{
63+ // BinlogEventQueueSize: taskStatus.BufferStat.BinlogEventQueueSize,
64+ // ExtractorTxQueueSize: taskStatus.BufferStat.ExtractorTxQueueSize,
65+ // ApplierTxQueueSize: taskStatus.BufferStat.ApplierTxQueueSize,
66+ // SendByTimeout: taskStatus.BufferStat.SendByTimeout,
67+ // SendBySizeFull: taskStatus.BufferStat.SendBySizeFull,
68+ //},
69+ Stage : "TODO" ,
70+ //Timestamp: taskStatus.Timestamp,
17171 }
17272
17373 return c .JSON (http .StatusOK , & res )
0 commit comments