1515 */
1616package com .github .shyiko .mysql .binlog ;
1717
18- import com .github .shyiko .mysql .binlog .event .*;
19- import com .github .shyiko .mysql .binlog .event .deserialization .*;
18+ import com .github .shyiko .mysql .binlog .event .AnnotateRowsEventData ;
19+ import com .github .shyiko .mysql .binlog .event .Event ;
20+ import com .github .shyiko .mysql .binlog .event .EventHeader ;
21+ import com .github .shyiko .mysql .binlog .event .EventHeaderV4 ;
22+ import com .github .shyiko .mysql .binlog .event .EventType ;
23+ import com .github .shyiko .mysql .binlog .event .GtidEventData ;
24+ import com .github .shyiko .mysql .binlog .event .MariadbGtidEventData ;
25+ import com .github .shyiko .mysql .binlog .event .MariadbGtidListEventData ;
26+ import com .github .shyiko .mysql .binlog .event .QueryEventData ;
27+ import com .github .shyiko .mysql .binlog .event .RotateEventData ;
28+ import com .github .shyiko .mysql .binlog .event .deserialization .ChecksumType ;
29+ import com .github .shyiko .mysql .binlog .event .deserialization .EventDataDeserializationException ;
30+ import com .github .shyiko .mysql .binlog .event .deserialization .EventDataDeserializer ;
31+ import com .github .shyiko .mysql .binlog .event .deserialization .EventDeserializer ;
2032import com .github .shyiko .mysql .binlog .event .deserialization .EventDeserializer .EventDataWrapper ;
33+ import com .github .shyiko .mysql .binlog .event .deserialization .GtidEventDataDeserializer ;
34+ import com .github .shyiko .mysql .binlog .event .deserialization .QueryEventDataDeserializer ;
35+ import com .github .shyiko .mysql .binlog .event .deserialization .RotateEventDataDeserializer ;
2136import com .github .shyiko .mysql .binlog .io .ByteArrayInputStream ;
2237import com .github .shyiko .mysql .binlog .jmx .BinaryLogClientMXBean ;
2338import com .github .shyiko .mysql .binlog .network .AuthenticationException ;
3449import com .github .shyiko .mysql .binlog .network .protocol .Packet ;
3550import com .github .shyiko .mysql .binlog .network .protocol .PacketChannel ;
3651import com .github .shyiko .mysql .binlog .network .protocol .ResultSetRowPacket ;
37- import com .github .shyiko .mysql .binlog .network .protocol .command .*;
52+ import com .github .shyiko .mysql .binlog .network .protocol .command .Command ;
53+ import com .github .shyiko .mysql .binlog .network .protocol .command .DumpBinaryLogCommand ;
54+ import com .github .shyiko .mysql .binlog .network .protocol .command .DumpBinaryLogGtidCommand ;
55+ import com .github .shyiko .mysql .binlog .network .protocol .command .PingCommand ;
56+ import com .github .shyiko .mysql .binlog .network .protocol .command .QueryCommand ;
57+ import com .github .shyiko .mysql .binlog .network .protocol .command .SSLRequestCommand ;
3858
3959import javax .net .ssl .SSLContext ;
4060import javax .net .ssl .TrustManager ;
@@ -115,14 +135,12 @@ public X509Certificate[] getAcceptedIssuers() {
115135 private volatile long connectionId ;
116136 private SSLMode sslMode = SSLMode .DISABLED ;
117137
118- private GtidSet gtidSet ;
119- private final Object gtidSetAccessLock = new Object ();
138+ protected GtidSet gtidSet ;
139+ protected final Object gtidSetAccessLock = new Object ();
120140 private boolean gtidSetFallbackToPurged ;
121141 private boolean useBinlogFilenamePositionInGtidMode ;
122142 private String gtid ;
123143 private boolean tx ;
124- private boolean isMariadb = false ;
125- private boolean mariadbSendAnnotateRowsEvent = false ;
126144
127145 private EventDeserializer eventDeserializer = new EventDeserializer ();
128146
@@ -132,7 +150,7 @@ public X509Certificate[] getAcceptedIssuers() {
132150 private SocketFactory socketFactory ;
133151 private SSLSocketFactory sslSocketFactory ;
134152
135- private volatile PacketChannel channel ;
153+ protected volatile PacketChannel channel ;
136154 private volatile boolean connected ;
137155 private volatile long masterServerId = -1 ;
138156
@@ -317,15 +335,14 @@ public void setGtidSet(String gtidSet) {
317335 this .binlogFilename = "" ;
318336 }
319337 synchronized (gtidSetAccessLock ) {
320- // mariadb GtidSet format will be domainId-serverId-sequence
321- if (gtidSet != null && !gtidSet .contains (":" )) {
322- this .gtidSet = new MariadbGtidSet (gtidSet );
323- } else {
324- this .gtidSet = gtidSet != null ? new GtidSet (gtidSet ) : null ;
325- }
338+ this .gtidSet = gtidSet != null ? buildGtidSet (gtidSet ) : null ;
326339 }
327340 }
328341
342+ protected GtidSet buildGtidSet (String gtidSet ) {
343+ return new GtidSet (gtidSet );
344+ }
345+
329346 /**
330347 * @see #setGtidSetFallbackToPurged(boolean)
331348 * @return whether gtid_purged is used as a fallback
@@ -488,19 +505,6 @@ public void setThreadFactory(ThreadFactory threadFactory) {
488505 this .threadFactory = threadFactory ;
489506 }
490507
491- public boolean isMariadbSendAnnotateRowsEvent () {
492- return mariadbSendAnnotateRowsEvent ;
493- }
494-
495- /**
496- * Only in Mariadb, if set true, the Slave server connects with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2)
497- * in the COM_BINLOG_DUMP Slave Registration phase
498- * @param mariadbSendAnnotateRowsEvent
499- */
500- public void setMariadbSendAnnotateRowsEvent (boolean mariadbSendAnnotateRowsEvent ) {
501- this .mariadbSendAnnotateRowsEvent = mariadbSendAnnotateRowsEvent ;
502- }
503-
504508 /**
505509 * Connect to the replication stream. Note that this method blocks until disconnected.
506510 * @throws AuthenticationException if authentication fails
@@ -538,17 +542,8 @@ public void connect() throws IOException, IllegalStateException {
538542 channel .authenticationComplete ();
539543
540544 connectionId = greetingPacket .getThreadId ();
541- isMariadb = greetingPacket .getServerVersion ().toLowerCase ().contains ("mariadb" );
542545 if ("" .equals (binlogFilename )) {
543- synchronized (gtidSetAccessLock ) {
544- if (gtidSet != null && "" .equals (gtidSet .toString ()) && gtidSetFallbackToPurged ) {
545- if (isMariadb ) {
546- gtidSet = new MariadbGtidSet (fetchGtidPurged ());
547- } else {
548- gtidSet = new GtidSet (fetchGtidPurged ());
549- }
550- }
551- }
546+ setupGtidSet ();
552547 }
553548 if (binlogFilename == null ) {
554549 fetchBinlogFilenameAndPosition ();
@@ -597,13 +592,7 @@ public void connect() throws IOException, IllegalStateException {
597592 ensureEventDataDeserializer (EventType .ROTATE , RotateEventDataDeserializer .class );
598593 synchronized (gtidSetAccessLock ) {
599594 if (gtidSet != null ) {
600- ensureEventDataDeserializer (EventType .GTID , GtidEventDataDeserializer .class );
601- ensureEventDataDeserializer (EventType .QUERY , QueryEventDataDeserializer .class );
602- if (isMariadb ) {
603- ensureEventDataDeserializer (EventType .ANNOTATE_ROWS , AnnotateRowsEventDataDeserializer .class );
604- ensureEventDataDeserializer (EventType .MARIADB_GTID , MariadbGtidEventDataDeserializer .class );
605- ensureEventDataDeserializer (EventType .MARIADB_GTID_LIST , MariadbGtidListEventDataDeserializer .class );
606- }
595+ ensureGtidEventDataDeserializer ();
607596 }
608597 }
609598 listenForEventPackets ();
@@ -676,7 +665,7 @@ public Object call() throws Exception {
676665 };
677666 }
678667
679- private void checkError (byte [] packet ) throws IOException {
668+ protected void checkError (byte [] packet ) throws IOException {
680669 if (packet [0 ] == (byte ) 0xFF /* error */ ) {
681670 byte [] bytes = Arrays .copyOfRange (packet , 1 , packet .length );
682671 ErrorPacket errorPacket = new ErrorPacket (bytes );
@@ -720,7 +709,6 @@ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOExceptio
720709 return false ;
721710 }
722711
723-
724712 private void enableHeartbeat () throws IOException {
725713 channel .write (new QueryCommand ("set @master_heartbeat_period=" + heartbeatInterval * 1000000 ));
726714 byte [] statementResult = channel .read ();
@@ -735,35 +723,23 @@ private void setMasterServerId() throws IOException {
735723 }
736724 }
737725
738- private void requestBinaryLogStream () throws IOException {
726+ protected void requestBinaryLogStream () throws IOException {
739727 long serverId = blocking ? this .serverId : 0 ; // http://bugs.mysql.com/bug.php?id=71178
740728 Command dumpBinaryLogCommand ;
741729 synchronized (gtidSetAccessLock ) {
742730 if (gtidSet != null ) {
743- if (isMariadb ) {
744- channel .write (new QueryCommand ("SET @mariadb_slave_capability=4" ));
745- checkError (channel .read ());
746- channel .write (new QueryCommand ("SET @slave_connect_state = '" + gtidSet .toString () + "'" ));
747- checkError (channel .read ());
748- channel .write (new QueryCommand ("SET @slave_gtid_strict_mode = 0" ));
749- checkError (channel .read ());
750- channel .write (new QueryCommand ("SET @slave_gtid_ignore_duplicates = 0" ));
751- checkError (channel .read ());
752- dumpBinaryLogCommand = new DumpBinaryLogCommand (serverId , "" , 0L , isMariadbSendAnnotateRowsEvent ());
753- } else {
754- dumpBinaryLogCommand = new DumpBinaryLogGtidCommand (serverId ,
755- useBinlogFilenamePositionInGtidMode ? binlogFilename : "" ,
756- useBinlogFilenamePositionInGtidMode ? binlogPosition : 4 ,
757- gtidSet );
758- }
731+ dumpBinaryLogCommand = new DumpBinaryLogGtidCommand (serverId ,
732+ useBinlogFilenamePositionInGtidMode ? binlogFilename : "" ,
733+ useBinlogFilenamePositionInGtidMode ? binlogPosition : 4 ,
734+ gtidSet );
759735 } else {
760736 dumpBinaryLogCommand = new DumpBinaryLogCommand (serverId , binlogFilename , binlogPosition );
761737 }
762738 }
763739 channel .write (dumpBinaryLogCommand );
764740 }
765741
766- private void ensureEventDataDeserializer (EventType eventType ,
742+ protected void ensureEventDataDeserializer (EventType eventType ,
767743 Class <? extends EventDataDeserializer > eventDataDeserializerClass ) {
768744 EventDataDeserializer eventDataDeserializer = eventDeserializer .getEventDataDeserializer (eventType );
769745 if (eventDataDeserializer .getClass () != eventDataDeserializerClass &&
@@ -780,6 +756,10 @@ private void ensureEventDataDeserializer(EventType eventType,
780756 }
781757 }
782758
759+ protected void ensureGtidEventDataDeserializer () {
760+ ensureEventDataDeserializer (EventType .GTID , GtidEventDataDeserializer .class );
761+ ensureEventDataDeserializer (EventType .QUERY , QueryEventDataDeserializer .class );
762+ }
783763
784764 private void spawnKeepAliveThread () {
785765 final ExecutorService threadExecutor =
@@ -924,6 +904,14 @@ private String fetchGtidPurged() throws IOException {
924904 return "" ;
925905 }
926906
907+ protected void setupGtidSet () throws IOException {
908+ synchronized (gtidSetAccessLock ) {
909+ if (gtidSet != null && "" .equals (gtidSet .toString ()) && gtidSetFallbackToPurged ) {
910+ gtidSet = new GtidSet (fetchGtidPurged ());
911+ }
912+ }
913+ }
914+
927915 private void fetchBinlogFilenameAndPosition () throws IOException {
928916 ResultSetRowPacket [] resultSet ;
929917 channel .write (new QueryCommand ("show master status" ));
@@ -1025,7 +1013,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac
10251013 return result ;
10261014 }
10271015
1028- private void updateClientBinlogFilenameAndPosition (Event event ) {
1016+ protected void updateClientBinlogFilenameAndPosition (Event event ) {
10291017 EventHeader eventHeader = event .getHeader ();
10301018 EventType eventType = eventHeader .getEventType ();
10311019 if (eventType == EventType .ROTATE ) {
@@ -1044,7 +1032,7 @@ private void updateClientBinlogFilenameAndPosition(Event event) {
10441032 }
10451033 }
10461034
1047- private void updateGtidSet (Event event ) {
1035+ protected void updateGtidSet (Event event ) {
10481036 synchronized (gtidSetAccessLock ) {
10491037 if (gtidSet == null ) {
10501038 return ;
@@ -1070,34 +1058,39 @@ private void updateGtidSet(Event event) {
10701058 tx = false ;
10711059 break ;
10721060 case QUERY :
1073- case ANNOTATE_ROWS :
1074- String sql ;
1075- if (eventHeader .getEventType () == EventType .QUERY ) {
1076- QueryEventData queryEventData = (QueryEventData ) EventDataWrapper .internal (event .getData ());
1077- sql = queryEventData .getSql ();
1078- } else {
1079- AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData ) EventDataWrapper .internal (event .getData ());
1080- sql = annotateRowsEventData .getRowsQuery ();
1081- }
1082-
1061+ QueryEventData queryEventData = (QueryEventData ) EventDataWrapper .internal (event .getData ());
1062+ String sql = queryEventData .getSql ();
10831063 if (sql == null ) {
10841064 break ;
10851065 }
1086- if ("BEGIN" .equals (sql )) {
1087- tx = true ;
1088- } else
1089- if ("COMMIT" .equals (sql ) || "ROLLBACK" .equals (sql )) {
1090- commitGtid ();
1091- tx = false ;
1092- } else
1093- if (!tx ) {
1094- // auto-commit query, likely DDL
1095- commitGtid ();
1066+ commitGtid (sql );
1067+ break ;
1068+ case ANNOTATE_ROWS :
1069+ AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData ) EventDeserializer .EventDataWrapper .internal (event .getData ());
1070+ sql = annotateRowsEventData .getRowsQuery ();
1071+ if (sql == null ) {
1072+ break ;
10961073 }
1074+ commitGtid (sql );
1075+ break ;
10971076 default :
10981077 }
10991078 }
11001079
1080+ protected void commitGtid (String sql ) {
1081+ if ("BEGIN" .equals (sql )) {
1082+ tx = true ;
1083+ } else
1084+ if ("COMMIT" .equals (sql ) || "ROLLBACK" .equals (sql )) {
1085+ commitGtid ();
1086+ tx = false ;
1087+ } else
1088+ if (!tx ) {
1089+ // auto-commit query, likely DDL
1090+ commitGtid ();
1091+ }
1092+ }
1093+
11011094 private void commitGtid () {
11021095 if (gtid != null ) {
11031096 synchronized (gtidSetAccessLock ) {
@@ -1308,7 +1301,7 @@ public interface LifecycleListener {
13081301 /**
13091302 * Default (no-op) implementation of {@link LifecycleListener}.
13101303 */
1311- public static abstract class AbstractLifecycleListener implements LifecycleListener {
1304+ public static abstract class AbstractLifecycleListener implements LifecycleListener {
13121305
13131306 public void onConnect (BinaryLogClient client ) { }
13141307
0 commit comments