Skip to content

Commit 2b95a95

Browse files
committed
Fix row events for MySQL8 partitioned table
Since MySQL 8.0.16, partition info has been [added](mysql/mysql-server@e11a540) into extra_data in row event. Current code is aware of extra_data but doesn't parse it accurately. Its length is defined as 'extra_data_length / 8'. However, the [maximum length of partition info](https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/rows_event.h#L772-L814) is 5. Wrong definition leads program to read 0 (= 5/8) bytes of extra_data. This causes any row event on partitioned table leads to packet parse failure (undefined behavior). In some cases errors occur, and in other cases parse result shows incorrect filed values as in [issue #354](#354 (comment)). 1. Fix byte-length of extra_data to conform to repl protocol extra_data_length / 8 -> extra_data_length - 2 Refer to [MySQL Document](https://dev.mysql.com/doc/internals/en/rows-event.html) and [MySQL source code]((https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/rows_event.cpp#L415)). Above fixes failure of packet parsing, but following modification is further required to get partition info. 2. Parse extra_data according to [partition info layout](https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/rows_event.h#L772-L814).
1 parent 3de6ff4 commit 2b95a95

File tree

3 files changed

+31
-1
lines changed

3 files changed

+31
-1
lines changed

pymysqlreplication/row_event.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,17 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
5757
self.event_type == BINLOG.DELETE_ROWS_EVENT_V2 or \
5858
self.event_type == BINLOG.UPDATE_ROWS_EVENT_V2:
5959
self.flags, self.extra_data_length = struct.unpack('<HH', self.packet.read(4))
60-
self.extra_data = self.packet.read(self.extra_data_length / 8)
60+
if self.extra_data_length > 2:
61+
self.extra_data_type = struct.unpack('<B', self.packet.read(1))[0]
62+
# partition information
63+
if self.extra_data_type == 1:
64+
if self.event_type == BINLOG.UPDATE_ROWS_EVENT_V2:
65+
self.partition_id, self.source_partition_id = struct.unpack('<HH', self.packet.read(4))
66+
else:
67+
self.partition_id = struct.unpack('<H', self.packet.read(2))[0]
68+
# NDB information etc.
69+
else:
70+
self.extra_data = self.packet.read(self.extra_data_length - 3)
6171
else:
6272
self.flags = struct.unpack('<H', self.packet.read(2))[0]
6373

pymysqlreplication/tests/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ def isMySQL57(self):
6060
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
6161
return version == 5.7
6262

63+
def isMySQL80AndMore(self):
64+
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
65+
return version >= 8.0
66+
6367
@property
6468
def supportsGTID(self):
6569
if not self.isMySQL56AndMore():

pymysqlreplication/tests/test_data_type.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,5 +625,21 @@ def test_zerofill(self):
625625
self.assertEqual(event.rows[0]["values"]["test4"], '0000000001')
626626
self.assertEqual(event.rows[0]["values"]["test5"], '00000000000000000001')
627627

628+
def test_extra_data(self):
629+
if not self.isMySQL80AndMore():
630+
self.skipTest("Not supported in this version of MySQL")
631+
create_query = "CREATE TABLE test (id INTEGER) \
632+
PARTITION BY RANGE (id) ( \
633+
PARTITION p0 VALUES LESS THAN (1), \
634+
PARTITION p1 VALUES LESS THAN (2), \
635+
PARTITION p2 VALUES LESS THAN (3), \
636+
PARTITION p3 VALUES LESS THAN (4), \
637+
PARTITION p4 VALUES LESS THAN (5), \
638+
)"
639+
insert_query = "INSERT INTO test (id) VALUES(3)"
640+
event = self.create_and_insert_value(create_query, insert_query)
641+
self.assertEqual(event.extra_data_type, 1)
642+
self.assertEqual(event.partition_id, 3)
643+
628644
if __name__ == "__main__":
629645
unittest.main()

0 commit comments

Comments
 (0)