99
1010class BinLogEvent (object ):
1111 def __init__ (self , from_packet , event_size , table_map , ctl_connection ,
12+ mysql_version = (0 ,0 ,0 ),
1213 only_tables = None ,
1314 ignored_tables = None ,
1415 only_schemas = None ,
@@ -21,6 +22,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
2122 self .timestamp = self .packet .timestamp
2223 self .event_size = event_size
2324 self ._ctl_connection = ctl_connection
25+ self .mysql_version = mysql_version
2426 self ._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
2527 # The event have been fully processed, if processed is false
2628 # the event will be skipped
@@ -58,6 +60,11 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
5860 self .commit_flag = byte2int (self .packet .read (1 )) == 1
5961 self .sid = self .packet .read (16 )
6062 self .gno = struct .unpack ('<Q' , self .packet .read (8 ))[0 ]
63+ self .lt_type = byte2int (self .packet .read (1 ))
64+
65+ if self .mysql_version >= (5 , 7 ):
66+ self .last_committed = struct .unpack ('<Q' , self .packet .read (8 ))[0 ]
67+ self .sequence_number = struct .unpack ('<Q' , self .packet .read (8 ))[0 ]
6168
6269 @property
6370 def gtid (self ):
@@ -73,6 +80,9 @@ def gtid(self):
7380 def _dump (self ):
7481 print ("Commit: %s" % self .commit_flag )
7582 print ("GTID_NEXT: %s" % self .gtid )
83+ if hasattr (self , "last_committed" ):
84+ print ("last_committed: %d" % self .last_committed )
85+ print ("sequence_number: %d" % self .sequence_number )
7686
7787 def __repr__ (self ):
7888 return '<GtidEvent "%s">' % self .gtid
@@ -98,8 +108,49 @@ def dump(self):
98108 print ()
99109
100110
111+ class XAPrepareEvent (BinLogEvent ):
112+ """An XA prepare event is generated for a XA prepared transaction.
113+ Like Xid_event it contans XID of the *prepared* transaction
114+
115+ Attributes:
116+ one_phase: current XA transaction commit method
117+ xid: serialized XID representation of XA transaction
118+ """
119+ def __init__ (self , from_packet , event_size , table_map , ctl_connection , ** kwargs ):
120+ super (XAPrepareEvent , self ).__init__ (from_packet , event_size , table_map ,
121+ ctl_connection , ** kwargs )
122+
123+ # one_phase is True: XA COMMIT ... ONE PHASE
124+ # one_phase is False: XA PREPARE
125+ self .one_phase = (self .packet .read (1 ) != b'\x00 ' )
126+ self .xid_format_id = struct .unpack ('<I' , self .packet .read (4 ))[0 ]
127+ gtrid_length = struct .unpack ('<I' , self .packet .read (4 ))[0 ]
128+ bqual_length = struct .unpack ('<I' , self .packet .read (4 ))[0 ]
129+ self .xid_gtrid = self .packet .read (gtrid_length )
130+ self .xid_bqual = self .packet .read (bqual_length )
131+
132+ @property
133+ def xid (self ):
134+ return self .xid_gtrid .decode () + self .xid_bqual .decode ()
135+
136+ def _dump (self ):
137+ print ("One phase: %s" % self .one_phase )
138+ print ("XID formatID: %d" % self .xid_format_id )
139+ print ("XID: %s" % self .xid )
140+
141+
101142class FormatDescriptionEvent (BinLogEvent ):
102- pass
143+ def __init__ (self , from_packet , event_size , table_map , ctl_connection , ** kwargs ):
144+ super (FormatDescriptionEvent , self ).__init__ (from_packet , event_size , table_map ,
145+ ctl_connection , ** kwargs )
146+ self .binlog_version = struct .unpack ('<H' , self .packet .read (2 ))
147+ self .mysql_version_str = self .packet .read (50 ).rstrip (b'\0 ' ).decode ()
148+ numbers = self .mysql_version_str .split ('-' )[0 ]
149+ self .mysql_version = tuple (map (int , numbers .split ('.' )))
150+
151+ def _dump (self ):
152+ print ("Binlog version: %s" % self .binlog_version )
153+ print ("MySQL version: %s" % self .mysql_version_str )
103154
104155
105156class StopEvent (BinLogEvent ):
0 commit comments