Skip to content

Commit a7f2de3

Browse files
author
Somesh Malviya
committed
Add support for compressed binlogs https://issues.redhat.com/browse/DBZ-2663
1 parent c19b1ad commit a7f2de3

File tree

8 files changed

+319
-1
lines changed

8 files changed

+319
-1
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@
7777
<version>2.9.10.3</version>
7878
<scope>test</scope>
7979
</dependency>
80+
<dependency>
81+
<groupId>com.github.luben</groupId>
82+
<artifactId>zstd-jni</artifactId>
83+
<version>1.5.0-2</version>
84+
<scope>compile</scope>
85+
</dependency>
8086
</dependencies>
8187
<build>
8288
<plugins>

src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,17 @@ public enum EventType {
192192
/**
193193
* Prepared XA transaction terminal event similar to XID except that it is specific to XA transaction.
194194
*/
195-
XA_PREPARE;
195+
XA_PREPARE,
196+
/**
197+
Extension of UPDATE_ROWS_EVENT, allowing partial values according
198+
to binlog_row_value_options.
199+
*/
200+
PARTIAL_UPDATE_ROWS_EVENT,
201+
/**
202+
* Generated when 'binlog_transaction_compression' is set to 'ON'.
203+
* It encapsulates all the events of a transaction in a Zstd compressed payload.
204+
*/
205+
TRANSACTION_PAYLOAD;
196206

197207
public static boolean isRowMutation(EventType eventType) {
198208
return EventType.isWrite(eventType) ||
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.github.shyiko.mysql.binlog.event;
2+
3+
import java.util.ArrayList;
4+
5+
6+
public class TransactionPayloadEventData implements EventData {
7+
private int payloadSize;
8+
private int uncompressedSize;
9+
private int compressionType;
10+
private byte[] payload;
11+
private ArrayList<Event> uncompressedEvents;
12+
13+
public ArrayList<Event> getUncompressedEvents() {
14+
return uncompressedEvents;
15+
}
16+
17+
public void setUncompressedEvents(ArrayList<Event> uncompressedEvents) {
18+
this.uncompressedEvents = uncompressedEvents;
19+
}
20+
21+
public int getPayloadSize() {
22+
return payloadSize;
23+
}
24+
25+
public void setPayloadSize(int payloadSize) {
26+
this.payloadSize = payloadSize;
27+
}
28+
29+
public int getUncompressedSize() {
30+
return uncompressedSize;
31+
}
32+
33+
public void setUncompressedSize(int uncompressedSize) {
34+
this.uncompressedSize = uncompressedSize;
35+
}
36+
37+
public int getCompressionType() {
38+
return compressionType;
39+
}
40+
41+
public void setCompressionType(int compressionType) {
42+
this.compressionType = compressionType;
43+
}
44+
45+
public byte[] getPayload() {
46+
return payload;
47+
}
48+
49+
public void setPayload(byte[] payload) {
50+
this.payload = payload;
51+
}
52+
53+
@Override
54+
public String toString() {
55+
final StringBuilder sb = new StringBuilder();
56+
sb.append("TransactionPayloadEventData");
57+
sb.append("{compression_type=").append(compressionType).append(", payload_size=").append(payloadSize).append(", uncompressed_size='").append(uncompressedSize).append('\'');
58+
sb.append(", payload: ");
59+
sb.append("\n");
60+
for (Event e : uncompressedEvents) {
61+
sb.append(e.toString());
62+
sb.append("\n");
63+
}
64+
sb.append("}");
65+
return sb.toString();
66+
}
67+
}

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.github.shyiko.mysql.binlog.event.FormatDescriptionEventData;
2323
import com.github.shyiko.mysql.binlog.event.LRUCache;
2424
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
25+
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
2526
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2627

2728
import java.io.IOException;
@@ -120,6 +121,8 @@ private void registerDefaultEventDataDeserializers() {
120121
new PreviousGtidSetDeserializer());
121122
eventDataDeserializers.put(EventType.XA_PREPARE,
122123
new XAPrepareEventDataDeserializer());
124+
eventDataDeserializers.put(EventType.TRANSACTION_PAYLOAD,
125+
new TransactionPayloadEventDataDeserializer());
123126
}
124127

125128
public void setEventDataDeserializer(EventType eventType, EventDataDeserializer eventDataDeserializer) {
@@ -227,6 +230,9 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
227230
case TABLE_MAP:
228231
eventData = deserializeTableMapEventData(inputStream, eventHeader);
229232
break;
233+
case TRANSACTION_PAYLOAD:
234+
eventData = deserializeTransactionPayloadEventData(inputStream, eventHeader);
235+
break;
230236
default:
231237
EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());
232238
eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);
@@ -272,6 +278,26 @@ private EventData deserializeFormatDescriptionEventData(ByteArrayInputStream inp
272278
return eventData;
273279
}
274280

281+
public EventData deserializeTransactionPayloadEventData(ByteArrayInputStream inputStream, EventHeader eventHeader)
282+
throws IOException {
283+
EventDataDeserializer eventDataDeserializer = eventDataDeserializers.get(EventType.TRANSACTION_PAYLOAD);
284+
EventData eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);
285+
TransactionPayloadEventData transactionPayloadEventData = (TransactionPayloadEventData) eventData;
286+
287+
/**
288+
* Handling for TABLE_MAP events withing the transaction payload event. This is to ensure that for the table map
289+
* events within the transaction payload, the target table id and the event gets added to the
290+
* tableMapEventByTableId map. This is map is later used while deserializing rows.
291+
*/
292+
for (Event event : transactionPayloadEventData.getUncompressedEvents()) {
293+
if (event.getHeader().getEventType() == EventType.TABLE_MAP && event.getData() != null) {
294+
TableMapEventData tableMapEvent = (TableMapEventData) event.getData();
295+
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
296+
}
297+
}
298+
return eventData;
299+
}
300+
275301
public EventData deserializeTableMapEventData(ByteArrayInputStream inputStream, EventHeader eventHeader)
276302
throws IOException {
277303
EventDataDeserializer eventDataDeserializer =
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2013 Stanley Shyiko
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.shyiko.mysql.binlog.event.deserialization;
17+
18+
import com.github.luben.zstd.Zstd;
19+
import com.github.shyiko.mysql.binlog.event.Event;
20+
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
21+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
22+
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
import java.util.ArrayList;
26+
27+
/**
28+
* @author <a href="mailto:somesh.malviya@booking.com">Somesh Malviya</a>
29+
* @author <a href="mailto:debjeet.sarkar@booking.com">Debjeet Sarkar</a>
30+
*/
31+
public class TransactionPayloadEventDataDeserializer implements EventDataDeserializer<TransactionPayloadEventData> {
32+
public static final int OTW_PAYLOAD_HEADER_END_MARK = 0;
33+
public static final int OTW_PAYLOAD_SIZE_FIELD = 1;
34+
public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2;
35+
public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3;
36+
37+
@Override
38+
public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
39+
TransactionPayloadEventData eventData = new TransactionPayloadEventData();
40+
// Read the header fields from the event data
41+
while (inputStream.available() > 0) {
42+
int fieldType = 0;
43+
int fieldLen = 0;
44+
// Read the type of the field
45+
if (inputStream.available() >= 1) {
46+
fieldType = inputStream.readPackedInteger();
47+
}
48+
// We have reached the end of the Event Data Header
49+
if (fieldType == OTW_PAYLOAD_HEADER_END_MARK) {
50+
break;
51+
}
52+
// Read the size of the field
53+
if (inputStream.available() >= 1) {
54+
fieldLen = inputStream.readPackedInteger();
55+
}
56+
switch (fieldType) {
57+
case OTW_PAYLOAD_SIZE_FIELD:
58+
// Fetch the payload size
59+
eventData.setPayloadSize(inputStream.readPackedInteger());
60+
break;
61+
case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD:
62+
// Fetch the compression type
63+
eventData.setCompressionType(inputStream.readPackedInteger());
64+
break;
65+
case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
66+
// Fetch the uncompressed size
67+
eventData.setUncompressedSize(inputStream.readPackedInteger());
68+
break;
69+
default:
70+
// Ignore unrecognized field
71+
inputStream.read(fieldLen);
72+
break;
73+
}
74+
}
75+
if (eventData.getUncompressedSize() == 0) {
76+
// Default the uncompressed to the payload size
77+
eventData.setUncompressedSize(eventData.getPayloadSize());
78+
}
79+
// set the payload to the rest of the input buffer
80+
eventData.setPayload(inputStream.read(eventData.getPayloadSize()));
81+
82+
// Decompress the payload
83+
byte[] src = eventData.getPayload();
84+
byte[] dst = ByteBuffer.allocate(eventData.getUncompressedSize()).array();
85+
Zstd.decompressByteArray(dst, 0, dst.length, src, 0, src.length);
86+
87+
// Read and store events from decompressed byte array into input stream
88+
ArrayList<Event> decompressedEvents = new ArrayList<>();
89+
EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer();
90+
ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst);
91+
92+
Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);
93+
while(internalEvent != null) {
94+
decompressedEvents.add(internalEvent);
95+
internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);
96+
}
97+
98+
eventData.setUncompressedEvents(decompressedEvents);
99+
100+
return eventData;
101+
}
102+
}

src/test/java/com/github/shyiko/mysql/binlog/BinaryLogFileReaderIntegrationTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ public void testNextEvent() throws Exception {
4646
readAll(reader, 1462);
4747
}
4848

49+
@Test
50+
public void testNextEventCompressed() throws Exception {
51+
BinaryLogFileReader reader = new BinaryLogFileReader(
52+
new FileInputStream("src/test/resources/mysql-bin.compressed"));
53+
readAll(reader, 5);
54+
}
55+
4956
@Test
5057
public void testChecksumNONE() throws Exception {
5158
EventDeserializer eventDeserializer = new EventDeserializer();
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2013 Stanley Shyiko
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.shyiko.mysql.binlog.event.deserialization;
17+
18+
import com.github.shyiko.mysql.binlog.event.EventType;
19+
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
20+
import com.github.shyiko.mysql.binlog.event.XAPrepareEventData;
21+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
22+
import org.testng.annotations.Test;
23+
24+
import java.io.IOException;
25+
26+
import static org.testng.Assert.assertEquals;
27+
28+
/**
29+
* @author <a href="mailto:somesh.malviya@booking.com">Somesh Malviya</a>
30+
*/
31+
public class TransactionPayloadEventDataDeserializerTest {
32+
33+
/* DATA is a binary representation of following:
34+
TransactionPayloadEventData{compression_type=0, payload_size=451, uncompressed_size='960', payload:
35+
Event{header=EventHeaderV4{timestamp=1646406641000, eventType=QUERY, serverId=223344, headerLength=19, dataLength=57, nextPosition=0, flags=8}, data=QueryEventData{threadId=12, executionTime=0, errorCode=0, database='', sql='BEGIN'}}
36+
Event{header=EventHeaderV4{timestamp=1646406641000, eventType=TABLE_MAP, serverId=223344, headerLength=19, dataLength=63, nextPosition=0, flags=0}, data=TableMapEventData{tableId=84, database='demo', table='movies', columnTypes=3, 15, 3, 15, 15, 15, 15, 15, 15, 15, 15, columnMetadata=0, 1024, 0, 1024, 1024, 4096, 2048, 1024, 1024, 1024, 1024, columnNullability={}, eventMetadata=TableMapEventMetadata{signedness={}, defaultCharset=255, charsetCollations=null, columnCharsets=null, columnNames=null, setStrValues=null, enumStrValues=null, geometryTypes=null, simplePrimaryKeys=null, primaryKeysWithPrefix=null, enumAndSetDefaultCharset=null, enumAndSetColumnCharsets=null,visibility=null}}}
37+
Event{header=EventHeaderV4{timestamp=1646406641000, eventType=EXT_UPDATE_ROWS, serverId=223344, headerLength=19, dataLength=756, nextPosition=0, flags=0}, data=UpdateRowsEventData{tableId=84, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, includedColumns={0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, rows=[
38+
{before=[1, Once Upon a Time in the West, 1968, Italy, Western, Claudia Cardinale|Charles Bronson|Henry Fonda|Gabriele Ferzetti|Frank Wolff|Al Mulock|Jason Robards|Woody Strode|Jack Elam|Lionel Stander|Paolo Stoppa|Keenan Wynn|Aldo Sambrell, Sergio Leone, Ennio Morricone, Sergio Leone|Sergio Donati|Dario Argento|Bernardo Bertolucci, Tonino Delli Colli, Paramount Pictures], after=[1, Once Upon a Time in the West, 1968, Italy, Western|Action, Claudia Cardinale|Charles Bronson|Henry Fonda|Gabriele Ferzetti|Frank Wolff|Al Mulock|Jason Robards|Woody Strode|Jack Elam|Lionel Stander|Paolo Stoppa|Keenan Wynn|Aldo Sambrell, Sergio Leone, Ennio Morricone, Sergio Leone|Sergio Donati|Dario Argento|Bernardo Bertolucci, Tonino Delli Colli, Paramount Pictures]}
39+
]}}
40+
Event{header=EventHeaderV4{timestamp=1646406641000, eventType=XID, serverId=223344, headerLength=19, dataLength=8, nextPosition=0, flags=0}, data=XidEventData{xid=31}}
41+
}
42+
*/
43+
private static final byte[] DATA = {
44+
2, 1, 0, 3, 3, -4, -64, 3, 1, 3, -4, -61, 1, 0, 40, -75, 47, -3, 0, 88, -68, 13, 0, -90, -34,
45+
97, 57, 96, 103, -108, 14, 32, 1, 32, 8, -126, 32, 120, 18, 103, 8, -126, -114, 45, -84, -15,
46+
-9, -66, 68, 74, -118, -40, 82, 68, -110, 16, 13, -122, 26, 35, 98, 20, 123, 16, 7, -5, -10, 69,
47+
-128, 37, 107, 91, -42, 50, -10, -116, -6, -79, 51, 11, 93, -14, 73, 10, 87, 0, 81, 0, 81, 0,
48+
-1, -95, 63, -53, -78, 76, -31, -116, -56, -15, -88, -70, 26, 36, -55, -28, -13, 44, 66, -60,
49+
56, 4, -3, 113, -122, -58, 35, -112, 8, 18, 41, 28, -37, -42, -96, -83, -124, -73, -75, 84, -29,
50+
-48, 41, 62, -15, -88, -70, 6, 72, -110, -55, 71, -63, -125, 3, -90, -14, 103, -111, 67, 1, -98,
51+
-3, -15, 71, -125, -126, 88, -108, -16, -1, -104, 7, 79, -24, 6, -66, -16, -57, 53, -113, -86,
52+
-117, 33, 73, 38, -97, -100, -68, 96, 125, -103, -40, 32, 92, 7, 111, 51, -71, 110, -37, -109,
53+
-44, 33, 42, -59, -99, 73, -49, -29, 69, 16, -71, 49, -18, 87, 73, 108, -35, -45, -54, 18, -41,
54+
41, 55, -22, -87, 37, -75, 81, 29, 117, -106, 67, -32, -73, 16, 91, -50, 29, 30, -89, -16, -31,
55+
0, 126, 7, 4, -120, 45, 39, -73, -126, -55, 45, -41, 106, 20, -87, -55, 125, 49, -56, -99, 120,
56+
-63, 11, 4, -116, 57, 100, -71, 87, -109, -35, 44, -34, 110, -66, -32, -36, 62, -55, -46, 77,
57+
54, -27, 40, -111, -39, -61, 73, 86, -34, 77, 16, -11, -70, 26, 110, -78, 93, -85, 68, 124, 75,
58+
-79, -62, 77, -70, -27, 110, -102, 104, -87, -61, -28, -59, -92, 16, 113, -87, 126, 112, -109,
59+
30, -86, -101, 19, 49, -22, -87, -44, 19, -55, -115, 41, 68, -68, -104, -38, 117, 34, -46, 81,
60+
98, 69, -123, -21, -1, -1, -65, -31, 4, 30, 85, 23, -125, 36, -103, 124, -70, -63, -119, 18, 5,
61+
96, -5, 58, 112, 106, 18, 9, -71, -45, -106, 62, -107, 120, -92, 57, -41, -106, 108, -50, -19,
62+
37, -101, 27, 55, -59, 35, 109, -102, 58, -82, -31, -37, 74, 54, -11, -108, -33, 86, 98, 67, 94,
63+
-117, 71, -55, 110, 79, 47, -79, 65, -27, -66, -60, 3, -53, 61, -75, -9, 58, 34, -69, 113, 18,
64+
0, 9, -123, 64, 53, 121, 75, 21, -68, 7, 33, -73, -30, -127, -103, 9, 17, 66, -49, 84, 65, 2,
65+
43, 16, -125, 0, 43, 55, 114, 109, 4, -50, -64, -62, -64, 99, 0, 28, -96, 53, -96, -13, 0, -68,
66+
1, 0, 0
67+
};
68+
69+
// Compression type for Zstd is 0
70+
private static final int COMPRESSION_TYPE = 0;
71+
private static final int PAYLOAD_SIZE = 451;
72+
private static final int UNCOMPRESSED_SIZE = 960;
73+
private static final int NUMBER_OF_UNCOMPRESSED_EVENTS = 4;
74+
private static final String UNCOMPRESSED_UPDATE_EVENT =
75+
new StringBuilder()
76+
.append(
77+
"UpdateRowsEventData{tableId=84, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, includedColumns={0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, rows=[\n")
78+
.append(
79+
" {before=[1, Once Upon a Time in the West, 1968, Italy, Western, Claudia Cardinale|Charles Bronson|Henry Fonda|Gabriele Ferzetti|Frank Wolff|Al Mulock|Jason Robards|Woody Strode|Jack Elam|Lionel Stander|Paolo Stoppa|Keenan Wynn|Aldo Sambrell, Sergio Leone, Ennio Morricone, Sergio Leone|Sergio Donati|Dario Argento|Bernardo Bertolucci, Tonino Delli Colli, Paramount Pictures],")
80+
.append(
81+
" after=[1, Once Upon a Time in the West, 1968, Italy, Western|Action, Claudia Cardinale|Charles Bronson|Henry Fonda|Gabriele Ferzetti|Frank Wolff|Al Mulock|Jason Robards|Woody Strode|Jack Elam|Lionel Stander|Paolo Stoppa|Keenan Wynn|Aldo Sambrell, Sergio Leone, Ennio Morricone, Sergio Leone|Sergio Donati|Dario Argento|Bernardo Bertolucci, Tonino Delli Colli, Paramount Pictures]}\n")
82+
.append("]}")
83+
.toString();
84+
85+
@Test
86+
public void deserialize() throws IOException {
87+
TransactionPayloadEventDataDeserializer deserializer = new TransactionPayloadEventDataDeserializer();
88+
TransactionPayloadEventData transactionPayloadEventData =
89+
deserializer.deserialize(new ByteArrayInputStream(DATA));
90+
assertEquals(COMPRESSION_TYPE, transactionPayloadEventData.getCompressionType());
91+
assertEquals(PAYLOAD_SIZE, transactionPayloadEventData.getPayloadSize());
92+
assertEquals(UNCOMPRESSED_SIZE, transactionPayloadEventData.getUncompressedSize());
93+
assertEquals(NUMBER_OF_UNCOMPRESSED_EVENTS, transactionPayloadEventData.getUncompressedEvents().size());
94+
assertEquals(EventType.QUERY, transactionPayloadEventData.getUncompressedEvents().get(0).getHeader().getEventType());
95+
assertEquals(EventType.TABLE_MAP, transactionPayloadEventData.getUncompressedEvents().get(1).getHeader().getEventType());
96+
assertEquals(EventType.EXT_UPDATE_ROWS, transactionPayloadEventData.getUncompressedEvents().get(2).getHeader().getEventType());
97+
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
98+
assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString());
99+
}
100+
}
771 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)