@@ -202,6 +202,7 @@ func (c *client) Start(ctx context.Context, cfg interface{}, clientNodes []clust
202202 }(connID )
203203 }
204204
205+ var valid , total int64
205206 // downstream validators
206207 for validatorIdx := 0 ; validatorIdx < validateConcurrency ; validatorIdx ++ {
207208 wg .Add (1 )
@@ -217,10 +218,18 @@ func (c *client) Start(ctx context.Context, cfg interface{}, clientNodes []clust
217218 waitCancel ()
218219 log .Info ("ddl synced" )
219220
221+ atomic .AddInt64 (& total , 1 )
222+
220223 endTs , err := getDDLEndTs (downstream , tblName )
221224 if err != nil {
222225 log .Fatalf ("[cdc-bank] get ddl end ts error %v" , err )
223226 }
227+ // endTs maybe empty due to unknown reason now, if we meet this accidentally, just ignore this round.
228+ if endTs == "" {
229+ continue
230+ }
231+
232+ atomic .AddInt64 (& valid , 1 )
224233
225234 txn , err := downstream .Begin ()
226235 if err != nil {
@@ -269,6 +278,11 @@ func (c *client) Start(ctx context.Context, cfg interface{}, clientNodes []clust
269278 }
270279 wg .Wait ()
271280
281+ if total == 0 {
282+ log .Warn ("[cdc-bank] finished, but total check round is 0" )
283+ } else {
284+ log .Infof ("[cdc-bank] finished, valid check round: %+v, total try round: %+v, ratio: %f" , valid , total , float64 (valid )/ float64 (total ))
285+ }
272286 return nil
273287}
274288
@@ -327,7 +341,7 @@ type dataRow struct {
327341 TblID int64
328342 RowCount int64
329343 StartTime string
330- EndTime string
344+ EndTime * string
331345 State string
332346}
333347
@@ -345,7 +359,11 @@ func getDDLEndTs(db *sql.DB, tableName string) (result string, err error) {
345359 return "" , err
346360 }
347361 if line .JobType == "create table" && line .TblName == tableName && line .State == "synced" {
348- return line .EndTime , nil
362+ if line .EndTime == nil {
363+ log .Warnf ("ddl end time is null, line=%+v" , line )
364+ return "" , nil
365+ }
366+ return * line .EndTime , nil
349367 }
350368 }
351369 return "" , errors .New (fmt .Sprintf ("cannot find in ddl history, tableName: %s" , tableName ))
0 commit comments