Skip to content

Commit 3598de7

Browse files
committed
Adding support Mariadb events, such as GTID and annotate rows event(sql)
1 parent 9cc270e commit 3598de7

13 files changed

+605
-86
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,9 @@
1515
*/
1616
package com.github.shyiko.mysql.binlog;
1717

18-
import com.github.shyiko.mysql.binlog.event.Event;
19-
import com.github.shyiko.mysql.binlog.event.EventHeader;
20-
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
21-
import com.github.shyiko.mysql.binlog.event.EventType;
22-
import com.github.shyiko.mysql.binlog.event.GtidEventData;
23-
import com.github.shyiko.mysql.binlog.event.QueryEventData;
24-
import com.github.shyiko.mysql.binlog.event.RotateEventData;
25-
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
26-
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
27-
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
28-
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
18+
import com.github.shyiko.mysql.binlog.event.*;
19+
import com.github.shyiko.mysql.binlog.event.deserialization.*;
2920
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.EventDataWrapper;
30-
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
31-
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
32-
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
3321
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
3422
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
3523
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
@@ -46,15 +34,7 @@
4634
import com.github.shyiko.mysql.binlog.network.protocol.Packet;
4735
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
4836
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
49-
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateNativePasswordCommand;
50-
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSHA2Command;
51-
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSecurityPasswordCommand;
52-
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
53-
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
54-
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand;
55-
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
56-
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;
57-
import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand;
37+
import com.github.shyiko.mysql.binlog.network.protocol.command.*;
5838

5939
import javax.net.ssl.SSLContext;
6040
import javax.net.ssl.TrustManager;
@@ -141,6 +121,8 @@ public X509Certificate[] getAcceptedIssuers() {
141121
private boolean useBinlogFilenamePositionInGtidMode;
142122
private String gtid;
143123
private boolean tx;
124+
private boolean isMariadb = false;
125+
private boolean mariadbSendAnnotateRowsEvent = false;
144126

145127
private EventDeserializer eventDeserializer = new EventDeserializer();
146128

@@ -335,7 +317,12 @@ public void setGtidSet(String gtidSet) {
335317
this.binlogFilename = "";
336318
}
337319
synchronized (gtidSetAccessLock) {
338-
this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;
320+
// mariadb GtidSet format will be domainId-serverId-sequence
321+
if (gtidSet != null && !gtidSet.contains(":")) {
322+
this.gtidSet = new MariadbGtidSet(gtidSet);
323+
} else {
324+
this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;
325+
}
339326
}
340327
}
341328

@@ -501,6 +488,19 @@ public void setThreadFactory(ThreadFactory threadFactory) {
501488
this.threadFactory = threadFactory;
502489
}
503490

491+
public boolean isMariadbSendAnnotateRowsEvent() {
492+
return mariadbSendAnnotateRowsEvent;
493+
}
494+
495+
/**
496+
* Only in Mariadb, if set true, the Slave server connects with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2)
497+
* in the COM_BINLOG_DUMP Slave Registration phase
498+
* @param mariadbSendAnnotateRowsEvent
499+
*/
500+
public void setMariadbSendAnnotateRowsEvent(boolean mariadbSendAnnotateRowsEvent) {
501+
this.mariadbSendAnnotateRowsEvent = mariadbSendAnnotateRowsEvent;
502+
}
503+
504504
/**
505505
* Connect to the replication stream. Note that this method blocks until disconnected.
506506
* @throws AuthenticationException if authentication fails
@@ -538,10 +538,15 @@ public void connect() throws IOException, IllegalStateException {
538538
channel.authenticationComplete();
539539

540540
connectionId = greetingPacket.getThreadId();
541+
isMariadb = greetingPacket.getServerVersion().toLowerCase().contains("mariadb");
541542
if ("".equals(binlogFilename)) {
542543
synchronized (gtidSetAccessLock) {
543544
if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
544-
gtidSet = new GtidSet(fetchGtidPurged());
545+
if (isMariadb) {
546+
gtidSet = new MariadbGtidSet(fetchGtidPurged());
547+
} else {
548+
gtidSet = new GtidSet(fetchGtidPurged());
549+
}
545550
}
546551
}
547552
}
@@ -594,6 +599,11 @@ public void connect() throws IOException, IllegalStateException {
594599
if (gtidSet != null) {
595600
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
596601
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
602+
if (isMariadb) {
603+
ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class);
604+
ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class);
605+
ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class);
606+
}
597607
}
598608
}
599609
listenForEventPackets();
@@ -730,10 +740,22 @@ private void requestBinaryLogStream() throws IOException {
730740
Command dumpBinaryLogCommand;
731741
synchronized (gtidSetAccessLock) {
732742
if (gtidSet != null) {
733-
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
734-
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
735-
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
736-
gtidSet);
743+
if (isMariadb) {
744+
channel.write(new QueryCommand("SET @mariadb_slave_capability=4"));
745+
checkError(channel.read());
746+
channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'"));
747+
checkError(channel.read());
748+
channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0"));
749+
checkError(channel.read());
750+
channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0"));
751+
checkError(channel.read());
752+
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isMariadbSendAnnotateRowsEvent());
753+
} else {
754+
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
755+
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
756+
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
757+
gtidSet);
758+
}
737759
} else {
738760
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
739761
}
@@ -1034,13 +1056,30 @@ private void updateGtidSet(Event event) {
10341056
GtidEventData gtidEventData = (GtidEventData) EventDataWrapper.internal(event.getData());
10351057
gtid = gtidEventData.getGtid();
10361058
break;
1059+
case MARIADB_GTID:
1060+
MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDataWrapper.internal(event.getData());
1061+
mariadbGtidEventData.setServerId(eventHeader.getServerId());
1062+
gtid = mariadbGtidEventData.toString();
1063+
break;
1064+
case MARIADB_GTID_LIST:
1065+
MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDataWrapper.internal(event.getData());
1066+
gtid = mariadbGtidListEventData.getMariaGTIDSet().toString();
1067+
break;
10371068
case XID:
10381069
commitGtid();
10391070
tx = false;
10401071
break;
10411072
case QUERY:
1042-
QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData());
1043-
String sql = queryEventData.getSql();
1073+
case ANNOTATE_ROWS:
1074+
String sql;
1075+
if (eventHeader.getEventType() == EventType.QUERY) {
1076+
QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData());
1077+
sql = queryEventData.getSql();
1078+
} else {
1079+
AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDataWrapper.internal(event.getData());
1080+
sql = annotateRowsEventData.getRowsQuery();
1081+
}
1082+
10441083
if (sql == null) {
10451084
break;
10461085
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package com.github.shyiko.mysql.binlog;
2+
3+
import java.util.*;
4+
5+
/**
6+
* Mariadb Global Transaction ID
7+
*
8+
* @author <a href="mailto:winger2049@gmail.com">Winger</a>
9+
* @see <a href="https://mariadb.com/kb/en/gtid/">GTID</a> for the original doc
10+
*/
11+
public class MariadbGtidSet extends GtidSet {
12+
13+
private Map<Long, MariaGtid> map = new HashMap<>();
14+
15+
public MariadbGtidSet() {
16+
super(null); //
17+
}
18+
19+
public MariadbGtidSet(String gtidSet) {
20+
super(null);
21+
if (gtidSet != null && gtidSet.length() > 0) {
22+
String[] gtids = gtidSet.replaceAll("\n", "").split(",");
23+
for (String gtid : gtids) {
24+
MariaGtid mariaGtid = MariaGtid.parse(gtid);
25+
map.put(mariaGtid.getDomainId(), mariaGtid);
26+
}
27+
}
28+
}
29+
30+
@Override
31+
public String toString() {
32+
StringBuilder sb = new StringBuilder();
33+
for (MariaGtid gtid : map.values()) {
34+
if (sb.length() > 0) {
35+
sb.append(",");
36+
}
37+
sb.append(gtid.toString());
38+
}
39+
return sb.toString();
40+
}
41+
42+
@Override
43+
public Collection<UUIDSet> getUUIDSets() {
44+
throw new UnsupportedOperationException("Mariadb gtid not support this method");
45+
}
46+
47+
@Override
48+
public UUIDSet getUUIDSet(String uuid) {
49+
throw new UnsupportedOperationException("Mariadb gtid not support this method");
50+
}
51+
52+
@Override
53+
public UUIDSet putUUIDSet(UUIDSet uuidSet) {
54+
throw new UnsupportedOperationException("Mariadb gtid not support this method");
55+
}
56+
57+
@Override
58+
public boolean add(String gtid) {
59+
MariaGtid mariaGtid = MariaGtid.parse(gtid);
60+
map.put(mariaGtid.getDomainId(), mariaGtid);
61+
return true;
62+
}
63+
64+
public void add(MariaGtid gtid) {
65+
map.put(gtid.getDomainId(), gtid);
66+
}
67+
68+
@Override
69+
public boolean isContainedWithin(GtidSet other) {
70+
throw new UnsupportedOperationException("Mariadb gtid not support this method");
71+
}
72+
73+
@Override
74+
public int hashCode() {
75+
return map.keySet().hashCode();
76+
}
77+
78+
@Override
79+
public boolean equals(Object obj) {
80+
if (obj == this) {
81+
return true;
82+
}
83+
if (obj instanceof MariadbGtidSet) {
84+
MariadbGtidSet that = (MariadbGtidSet) obj;
85+
return this.map.equals(that.map);
86+
}
87+
return false;
88+
}
89+
90+
public static class MariaGtid {
91+
92+
// {domainId}-{serverId}-{sequence}
93+
private long domainId;
94+
private long serverId;
95+
private long sequence;
96+
97+
public MariaGtid(long domainId, long serverId, long sequence) {
98+
this.domainId = domainId;
99+
this.serverId = serverId;
100+
this.sequence = sequence;
101+
}
102+
103+
public MariaGtid(String gtid) {
104+
String[] gtidArr = gtid.split("-");
105+
this.domainId = Long.parseLong(gtidArr[0]);
106+
this.serverId = Long.parseLong(gtidArr[1]);
107+
this.sequence = Long.parseLong(gtidArr[2]);
108+
}
109+
110+
public static MariaGtid parse(String gtid) {
111+
return new MariaGtid(gtid);
112+
}
113+
114+
public long getDomainId() {
115+
return domainId;
116+
}
117+
118+
public void setDomainId(long domainId) {
119+
this.domainId = domainId;
120+
}
121+
122+
public long getServerId() {
123+
return serverId;
124+
}
125+
126+
public void setServerId(long serverId) {
127+
this.serverId = serverId;
128+
}
129+
130+
public long getSequence() {
131+
return sequence;
132+
}
133+
134+
public void setSequence(long sequence) {
135+
this.sequence = sequence;
136+
}
137+
138+
@Override
139+
public boolean equals(Object o) {
140+
if (this == o) {
141+
return true;
142+
}
143+
if (o == null || getClass() != o.getClass()) {
144+
return false;
145+
}
146+
MariaGtid mariaGtid = (MariaGtid) o;
147+
return domainId == mariaGtid.domainId &&
148+
serverId == mariaGtid.serverId &&
149+
sequence == mariaGtid.sequence;
150+
}
151+
152+
@Override
153+
public String toString() {
154+
return String.format("%s-%s-%s", domainId, serverId, sequence);
155+
}
156+
}
157+
}
158+
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.github.shyiko.mysql.binlog.event;
2+
3+
/**
4+
* Mariadb ANNOTATE_ROWS_EVENT events accompany row events and describe the query which caused the row event
5+
* Enable this with --binlog-annotate-row-events (default on from MariaDB 10.2.4).
6+
* In the binary log, each Annotate_rows event precedes the corresponding Table map event.
7+
* Note the master server sends ANNOTATE_ROWS_EVENT events only if the Slave server connects
8+
* with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2) in the COM_BINLOG_DUMP Slave Registration phase.
9+
*
10+
* @author <a href="mailto:winger2049@gmail.com">Winger</a>
11+
* @see <a href="https://mariadb.com/kb/en/annotate_rows_event/">ANNOTATE_ROWS_EVENT</a> for the original doc
12+
*/
13+
public class AnnotateRowsEventData implements EventData {
14+
15+
private String rowsQuery;
16+
17+
public String getRowsQuery() {
18+
return rowsQuery;
19+
}
20+
21+
public void setRowsQuery(String rowsQuery) {
22+
this.rowsQuery = rowsQuery;
23+
}
24+
25+
@Override
26+
public String toString() {
27+
return "AnnotateRowsEventData{" +
28+
"rowsQuery='" + rowsQuery + '\'' +
29+
'}';
30+
}
31+
}

0 commit comments

Comments
 (0)