33import struct
44
55from pymysqlreplication import constants , event , row_event
6- from typing import List , Tuple , Any , Dict , Optional , Union
6+
7+ from typing import List , Tuple , Dict , Optional , Union
8+ from pymysql .connections import MysqlPacket
79
810# Constants from PyMYSQL source code
911NULL_COLUMN = 251
1618UNSIGNED_INT24_LENGTH = 3
1719UNSIGNED_INT64_LENGTH = 8
1820
19-
2021JSONB_TYPE_SMALL_OBJECT = 0x0
2122JSONB_TYPE_LARGE_OBJECT = 0x1
2223JSONB_TYPE_SMALL_ARRAY = 0x2
3738JSONB_LITERAL_FALSE = 0x2
3839
3940
40- def read_offset_or_inline (packet , large : bool ) -> Tuple [Any , Any , Any ]:
41- t = packet .read_uint8 ()
42-
43- if t in (JSONB_TYPE_LITERAL ,
44- JSONB_TYPE_INT16 , JSONB_TYPE_UINT16 ):
45- return t , None , packet .read_binary_json_type_inlined (t , large )
46- if large and t in (JSONB_TYPE_INT32 , JSONB_TYPE_UINT32 ):
47- return t , None , packet .read_binary_json_type_inlined (t , large )
48-
49- if large :
50- return t , packet .read_uint32 (), None
51- return t , packet .read_uint16 (), None
52-
53-
5441class BinLogPacketWrapper (object ):
5542 """
5643 Bin Log Packet Wrapper uses an existing packet object and wraps around it,
@@ -82,7 +69,7 @@ class BinLogPacketWrapper(object):
8269 constants .DELETE_ROWS_EVENT_V2 : row_event .DeleteRowsEvent ,
8370 constants .TABLE_MAP_EVENT : row_event .TableMapEvent ,
8471
85- #5.6 GTID enabled replication events
72+ # 5.6 GTID enabled replication events
8673 constants .ANONYMOUS_GTID_LOG_EVENT : event .NotImplementedEvent ,
8774 constants .ANONYMOUS_GTID_LOG_EVENT : event .NotImplementedEvent ,
8875 constants .PREVIOUS_GTIDS_LOG_EVENT : event .NotImplementedEvent ,
@@ -95,7 +82,8 @@ class BinLogPacketWrapper(object):
9582 constants .MARIADB_START_ENCRYPTION_EVENT : event .MariadbStartEncryptionEvent
9683 }
9784
98- def __init__ (self , from_packet ,
85+ def __init__ (self ,
86+ from_packet ,
9987 table_map ,
10088 ctl_connection ,
10189 mysql_version ,
@@ -438,13 +426,13 @@ def read_binary_json_object(self, length: int, large: bool) -> Dict[str, str]:
438426 if large :
439427 key_offset_lengths = [(
440428 self .read_uint32 (), # offset (we don't actually need that)
441- self .read_uint16 () # size of the key
442- ) for _ in range (elements )]
429+ self .read_uint16 () # size of the key
430+ ) for _ in range (elements )]
443431 else :
444432 key_offset_lengths = [(
445433 self .read_uint16 (), # offset (we don't actually need that)
446- self .read_uint16 () # size of key
447- ) for _ in range (elements )]
434+ self .read_uint16 () # size of key
435+ ) for _ in range (elements )]
448436
449437 value_type_inlined_lengths = [read_offset_or_inline (self , large )
450438 for _ in range (elements )]
@@ -498,3 +486,18 @@ def read_string(self) -> bytes:
498486 string += char
499487
500488 return string
489+
490+
491+ def read_offset_or_inline (packet : Union [MysqlPacket , BinLogPacketWrapper ], large : bool ) \
492+ -> Tuple [int , Optional [int ], Optional [Union [bool , str ]]]:
493+ t = packet .read_uint8 ()
494+
495+ if t in (JSONB_TYPE_LITERAL ,
496+ JSONB_TYPE_INT16 , JSONB_TYPE_UINT16 ):
497+ return t , None , packet .read_binary_json_type_inlined (t , large )
498+ if large and t in (JSONB_TYPE_INT32 , JSONB_TYPE_UINT32 ):
499+ return t , None , packet .read_binary_json_type_inlined (t , large )
500+
501+ if large :
502+ return t , packet .read_uint32 (), None
503+ return t , packet .read_uint16 (), None
0 commit comments