@@ -200,6 +200,7 @@ func (a *ApplierIncr) bigTxQueueExecutor() {
200200 for {
201201 item := <- a .bigTxEventQueue
202202 if item == nil {
203+ // chan closed in Shutdown()
203204 break
204205 }
205206
@@ -255,10 +256,11 @@ func (a *ApplierIncr) MtsWorker(workerIndex int) {
255256
256257func (a * ApplierIncr ) handleEntry (entryCtx * common.EntryContext ) (err error ) {
257258 binlogEntry := entryCtx .Entry
259+ isBig := binlogEntry .IsPartOfBigTx ()
258260 txGno := binlogEntry .Coordinates .GetGNO ()
259261
260262 if a .inBigTx && binlogEntry .Index == 0 {
261- a .logger .Info ("found resent BinlogEntry inBigTx " , "gno" , txGno )
263+ a .logger .Info ("bigtx: found resent BinlogEntry" , "gno" , txGno )
262264 // src is resending an earlier BinlogEntry
263265 _ , err = a .dbs [0 ].Db .ExecContext (a .ctx , "rollback" )
264266 if err != nil {
@@ -328,6 +330,9 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
328330 gtidSetItem .NRow += 1
329331 if binlogEntry .Coordinates .GetSequenceNumber () == 0 {
330332 // MySQL 5.6: non mts
333+ if isBig {
334+ a .inBigTx = true
335+ }
331336 err := a .setTableItemForBinlogEntry (entryCtx )
332337 if err != nil {
333338 return err
@@ -365,21 +370,21 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
365370 }
366371
367372 hasDDL := binlogEntry .HasDDL ()
368- // DDL must be executed separatedly
369- if hasDDL || a .prevDDL {
370- a .logger .Debug ("MTS found DDL. WaitForAllCommitted" ,
371- "gno" , txGno , "hasDDL" , hasDDL , "prevDDL" , a .prevDDL )
373+ inMiddleDDL := hasDDL || a .prevDDL // DDL must be executed separatedly
374+ if inMiddleDDL || isBig {
375+ a .logger .Info ("WaitForAllCommitted" ,
376+ "gno" , txGno , "seq" , binlogEntry .Coordinates .GetSequenceNumber (),
377+ "lc" , binlogEntry .Coordinates .GetLastCommit (), "leq" , a .mtsManager .lastEnqueue ,
378+ "hasDDL" , hasDDL , "prevDDL" , a .prevDDL ,
379+ "bigtx" , isBig , "index" , binlogEntry .Index )
372380 if ! a .mtsManager .WaitForAllCommitted () {
373381 return nil // shutdown
374382 }
375383 }
376384 a .prevDDL = hasDDL
377385
378- if binlogEntry . IsPartOfBigTx () {
386+ if isBig {
379387 a .inBigTx = true
380- if ! a .mtsManager .WaitForAllCommitted () {
381- return nil // shutdown
382- }
383388 a .wsManager .resetCommonParent (binlogEntry .Coordinates .GetSequenceNumber ())
384389 }
385390 }
@@ -389,17 +394,18 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
389394 return err
390395 }
391396
392- if ! binlogEntry . IsPartOfBigTx () && ! a .mysqlContext .UseMySQLDependency {
397+ if ! isBig && ! a .mysqlContext .UseMySQLDependency {
393398 newLC := a .wsManager .GatLastCommit (entryCtx , a .logger )
394399 binlogEntry .Coordinates .(* common.MySQLCoordinateTx ).LastCommitted = newLC
395400 a .logger .Debug ("WritesetManager" , "lc" , newLC , "seq" , binlogEntry .Coordinates .GetSequenceNumber (),
396401 "gno" , txGno )
397402 }
398403
399- if binlogEntry . IsPartOfBigTx () {
404+ if isBig {
400405 if binlogEntry .Index == 0 {
401406 a .mtsManager .lastEnqueue = binlogEntry .Coordinates .GetSequenceNumber ()
402407 }
408+ a .logger .Info ("bigtx ApplyBinlogEvent" , "gno" , txGno , "index" , binlogEntry .Index )
403409 err = a .ApplyBinlogEvent (0 , entryCtx )
404410 if err != nil {
405411 return err
@@ -521,6 +527,7 @@ func (a *ApplierIncr) prepareIfNilAndExecute(item *dmlExecItem, workerIdx int) (
521527// ApplyEventQueries applies multiple DML queries onto the dest table
522528func (a * ApplierIncr ) ApplyBinlogEvent (workerIdx int , binlogEntryCtx * common.EntryContext ) (err error ) {
523529 logger := a .logger .Named ("ApplyBinlogEvent" )
530+ binlogEntryCtx .Rows = 0 // count for logging
524531 binlogEntry := binlogEntryCtx .Entry
525532 defer atomic .AddInt64 (a .memory2 , - int64 (binlogEntry .Size ()))
526533
@@ -535,7 +542,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
535542 // Note: gtid_next cannot be set when there is an ongoing transaction.
536543 if a .mysqlContext .SetGtidNext {
537544 _ , err = dbApplier .Db .ExecContext (a .ctx , fmt .Sprintf ("set gtid_next = '%v:%v' /*dtle*/" ,
538- binlogEntry .Coordinates .GetSidStr (), binlogEntry . Coordinates . GetGNO () ))
545+ binlogEntry .Coordinates .GetSidStr (), gno ))
539546 if err != nil {
540547 return errors .Wrap (err , "set gtid_next" )
541548 }
@@ -563,7 +570,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
563570 _ , err = dbApplier .Db .ExecContext (a .ctx , query )
564571 if err != nil {
565572 errCtx := errors .Wrapf (err , "tx.Exec. gno %v queryBegin %v workerIdx %v" ,
566- binlogEntry . Coordinates . GetGNO () , g .StrLim (query , 10 ), workerIdx )
573+ gno , g .StrLim (query , 10 ), workerIdx )
567574 if sql .IgnoreError (err ) {
568575 logger .Warn ("Ignore error" , "err" , errCtx )
569576 return nil
@@ -577,7 +584,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
577584
578585 queueOrExec := func (item * dmlExecItem ) error {
579586 // TODO check if shutdown?
580- if ! a . noBigTxDMLPipe && a . inBigTx {
587+ if a . inBigTx && ! a . noBigTxDMLPipe {
581588 a .bigTxEventWg .Add (1 )
582589 select {
583590 case <- a .shutdownCh :
@@ -594,7 +601,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
594601 if a .HasShutdown () {
595602 break
596603 }
597- logger .Debug ("binlogEntry.Events" , "gno" , binlogEntry . Coordinates . GetGNO () , "event" , i )
604+ logger .Debug ("binlogEntry.Events" , "gno" , gno , "event" , i )
598605
599606 if event .DML == common .NotDML {
600607 var err error
@@ -683,6 +690,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
683690 switch event .DML {
684691 case common .InsertDML :
685692 nRows := len (event .Rows )
693+ binlogEntryCtx .Rows += nRows
686694 for i := 0 ; i < nRows ; {
687695 var pstmt * * gosql.Stmt
688696 var rows [][]interface {}
@@ -717,6 +725,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
717725 }
718726 }
719727 case common .DeleteDML :
728+ binlogEntryCtx .Rows += len (event .Rows )
720729 for _ , row := range event .Rows {
721730 pstmt := & tableItem .PsDelete [workerIdx ]
722731 query , uniqueKeyArgs , hasUK , err := sql .BuildDMLDeleteQuery (event .DatabaseName , event .TableName ,
@@ -734,15 +743,16 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
734743 case common .UpdateDML :
735744 if len (event .Rows ) % 2 != 0 {
736745 return fmt .Errorf ("bad update event. row number is not 2N %v gno %v" ,
737- len (event .Rows ), binlogEntry . Coordinates . GetGNO () )
746+ len (event .Rows ), gno )
738747 }
748+ binlogEntryCtx .Rows += len (event .Rows ) / 2
739749 for i := 0 ; i < len (event .Rows ); i += 2 {
740750 rowBefore := event .Rows [i ]
741751 rowAfter := event .Rows [i + 1 ]
742752
743753 if len (rowBefore ) == 0 && len (rowAfter ) == 0 {
744754 return fmt .Errorf ("bad update event. row number is not 2N %v gno %v" ,
745- len (event .Rows ), binlogEntry . Coordinates . GetGNO () )
755+ len (event .Rows ), gno )
746756 }
747757
748758 if len (rowBefore ) == 0 { // insert
@@ -800,27 +810,42 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
800810 timestamp = event .Timestamp
801811 atomic .AddUint64 (& a .appliedQueryCount , uint64 (1 ))
802812 }
813+ if a .inBigTx && ! a .noBigTxDMLPipe {
814+ a .logger .Info ("a.bigTxEventWg.Wait before" , "gno" , gno , "index" , binlogEntry .Index )
815+ }
803816 a .bigTxEventWg .Wait ()
804817 if a .HasShutdown () {
805818 return fmt .Errorf ("ApplyBinlogEvent: applier has been shutdown. gno %v" , gno )
806819 }
807820
808821 if binlogEntry .Final {
822+ isBigTx := binlogEntry .IsPartOfBigTx ()
809823 if ! a .SkipGtidExecutedTable && a .sourceType == "mysql" {
810824 if binlogEntry .IsOneStmtDDL () && a .mysqlContext .SetGtidNext {
811825 err1 := dbApplier .SetGtidNextAutomatic (a .ctx )
812826 if err1 != nil {
813827 err = errors .Wrapf (err1 , "restore gtid_next" )
814828 }
815829 }
816- logger .Debug ("insert gno" , "gno" , binlogEntry .Coordinates .GetGNO ())
830+
831+ if a .logTxCommit || isBigTx {
832+ logger .Info ("insert gno" , "gno" , gno , "bigtx" , isBigTx , "index" , binlogEntry .Index ,
833+ "rows" , binlogEntryCtx .Rows )
834+ } else {
835+ logger .Debug ("insert gno" , "gno" , gno , "rows" , binlogEntryCtx .Rows )
836+ }
837+
817838 _ , err = dbApplier .PsInsertExecutedGtid .ExecContext (a .ctx ,
818- a .subject , binlogEntry .Coordinates .GetSid ().(uuid.UUID ).Bytes (), binlogEntry . Coordinates . GetGNO () )
839+ a .subject , binlogEntry .Coordinates .GetSid ().(uuid.UUID ).Bytes (), gno )
819840 if err != nil {
820841 return errors .Wrap (err , "insert gno" )
821842 }
822843 }
823844
845+ if a .logTxCommit || isBigTx {
846+ logger .Info ("committing tx" , "gno" , gno , "bigtx" , isBigTx , "index" , binlogEntry .Index ,
847+ "rows" , binlogEntryCtx .Rows )
848+ }
824849 if _ , err := dbApplier .Db .ExecContext (a .ctx , "commit" ); err != nil {
825850 return errors .Wrap (err , "dbApplier.Tx.Commit" )
826851 } else {
@@ -830,12 +855,15 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
830855 if a .printTps {
831856 atomic .AddUint32 (& a .txLastNSeconds , 1 )
832857 }
833- if a .logTxCommit {
834- logger .Info ("applier tx committed" , "gno" , binlogEntry .Coordinates .GetGNO ())
858+ if a .logTxCommit || isBigTx {
859+ logger .Info ("applier tx committed" , "gno" , gno , "bigtx" , isBigTx , "index" , binlogEntry .Index ,
860+ "rows" , binlogEntryCtx .Rows )
835861 } else {
836- logger .Debug ("applier tx committed" , "gno" , binlogEntry . Coordinates . GetGNO () )
862+ logger .Debug ("applier tx committed" , "gno" , gno , "rows" , binlogEntryCtx . Rows )
837863 }
838864 atomic .AddUint32 (& a .appliedTxCount , 1 )
865+ } else {
866+ logger .Info ("uncommitted bigtx part" , "gno" , gno , "index" , binlogEntry .Index , "rows" , binlogEntryCtx .Rows )
839867 }
840868 a .EntryExecutedHook (binlogEntry )
841869
0 commit comments