33import pymysql
44import struct
55
6- from pymysql .constants .COMMAND import COM_BINLOG_DUMP
6+ from pymysql .constants .COMMAND import COM_BINLOG_DUMP , COM_REGISTER_SLAVE
77from pymysql .cursors import DictCursor
88from pymysql .util import int2byte
99
3030MYSQL_EXPECTED_ERROR_CODES = [2013 , 2006 ]
3131
3232
33+ class ReportSlave (object ):
34+
35+ """Represent the values that you may report when connecting as a slave
36+ to a master. SHOW SLAVE HOSTS related"""
37+
38+ hostname = ''
39+ username = ''
40+ password = ''
41+ port = 0
42+
43+ def __init__ (self , value ):
44+ """
45+ Attributes:
46+ value: string or tuple
47+ if string, then it will be used hostname
48+ if tuple it will be used as (hostname, user, password, port)
49+ """
50+
51+ if isinstance (value , tuple ):
52+ l = len (value )
53+ if l > 1 :
54+ self .hostname = value [0 ]
55+ if l > 2 :
56+ self .username = value [1 ]
57+ if l > 3 :
58+ self .password = value [2 ]
59+ if l > 4 :
60+ self .port = int (value [3 ])
61+ else :
62+ self .hostname = value
63+
64+ def __repr__ (self ):
65+ return '<ReportSlave hostname=%s username=%s password=%s port=%d>' % \
66+ (self .hostname , self .username , self .password , self .port )
67+
68+ def encoded (self , server_id , master_id = 0 ):
69+ """
70+ server_id: the slave server-id
71+ master_id: usually 0. Appears as "master id" in SHOW SLAVE HOSTS
72+ on the master. Unknown what else it impacts.
73+ """
74+
75+ # 1 [15] COM_REGISTER_SLAVE
76+ # 4 server-id
77+ # 1 slaves hostname length
78+ # string[$len] slaves hostname
79+ # 1 slaves user len
80+ # string[$len] slaves user
81+ # 1 slaves password len
82+ # string[$len] slaves password
83+ # 2 slaves mysql-port
84+ # 4 replication rank
85+ # 4 master-id
86+
87+ lhostname = len (self .hostname )
88+ lusername = len (self .username )
89+ lpassword = len (self .password )
90+
91+ packet_len = (1 + # command
92+ 4 + # server-id
93+ 1 + # hostname length
94+ lhostname +
95+ 1 + # username length
96+ lusername +
97+ 1 + # password length
98+ lpassword +
99+ 2 + # slave mysql port
100+ 4 + # replication rank
101+ 4 ) # master-id
102+
103+ MAX_STRING_LEN = 257 # one byte for length + 256 chars
104+
105+ return (struct .pack ('<i' , packet_len ) +
106+ int2byte (COM_REGISTER_SLAVE ) +
107+ struct .pack ('<L' , server_id ) +
108+ struct .pack ('<%dp' % min (MAX_STRING_LEN , lhostname + 1 ),
109+ self .hostname ) +
110+ struct .pack ('<%dp' % min (MAX_STRING_LEN , lusername + 1 ),
111+ self .username ) +
112+ struct .pack ('<%dp' % min (MAX_STRING_LEN , lpassword + 1 ),
113+ self .password ) +
114+ struct .pack ('<H' , self .port ) +
115+ struct .pack ('<l' , 0 ) +
116+ struct .pack ('<l' , master_id ))
117+
33118
34119class BinLogStreamReader (object ):
120+
35121 """Connect to replication stream and read event
36122 """
123+ report_slave = None
37124
38125 def __init__ (self , connection_settings , server_id , resume_stream = False ,
39126 blocking = False , only_events = None , log_file = None , log_pos = None ,
40127 filter_non_implemented_events = True ,
41128 ignored_events = None , auto_position = None ,
42129 only_tables = None , only_schemas = None ,
43- freeze_schema = False , skip_to_timestamp = None ):
130+ freeze_schema = False , skip_to_timestamp = None ,
131+ report_slave = None ):
44132 """
45133 Attributes:
46134 resume_stream: Start for event from position or the latest event of
@@ -55,6 +143,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
55143 only_schemas: An array with the schemas you want to watch
56144 freeze_schema: If true do not support ALTER TABLE. It's faster.
57145 skip_to_timestamp: Ignore all events until reaching specified timestamp.
146+ report_slave: Report slave in SHOW SLAVE HOSTS.
58147 """
59148 self .__connection_settings = connection_settings
60149 self .__connection_settings ["charset" ] = "utf8"
@@ -85,6 +174,9 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
85174 self .auto_position = auto_position
86175 self .skip_to_timestamp = skip_to_timestamp
87176
177+ if report_slave :
178+ self .report_slave = ReportSlave (report_slave )
179+
88180 def close (self ):
89181 if self .__connected_stream :
90182 self ._stream_connection .close ()
@@ -118,6 +210,21 @@ def __checksum_enabled(self):
118210 return False
119211 return True
120212
213+ def _register_slave (self ):
214+ if not self .report_slave :
215+ return
216+
217+ packet = self .report_slave .encoded (self .__server_id )
218+
219+ if pymysql .__version__ < "0.6" :
220+ self ._stream_connection .wfile .write (packet )
221+ self ._stream_connection .wfile .flush ()
222+ self ._stream_connection .read_packet ()
223+ else :
224+ self ._stream_connection ._write_bytes (packet )
225+ self ._stream_connection ._next_seq_id = 1
226+ self ._stream_connection ._read_packet ()
227+
121228 def __connect_to_stream (self ):
122229 # log_pos (4) -- position in the binlog-file to start the stream with
123230 # flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
@@ -134,6 +241,8 @@ def __connect_to_stream(self):
134241 cur .execute ("set @master_binlog_checksum= @@global.binlog_checksum" )
135242 cur .close ()
136243
244+ self ._register_slave ()
245+
137246 if not self .auto_position :
138247 # only when log_file and log_pos both provided, the position info is
139248 # valid, if not, get the current position from master
0 commit comments