@@ -132,7 +132,8 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
132132 ignored_events = None , auto_position = None ,
133133 only_tables = None , only_schemas = None ,
134134 freeze_schema = False , skip_to_timestamp = None ,
135- report_slave = None , slave_uuid = None ):
135+ report_slave = None , slave_uuid = None ,
136+ pymysql_wrapper = None ):
136137 """
137138 Attributes:
138139 resume_stream: Start for event from position or the latest event of
@@ -183,6 +184,11 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
183184 self .report_slave = ReportSlave (report_slave )
184185 self .slave_uuid = slave_uuid
185186
187+ if pymysql_wrapper :
188+ self .pymysql_wrapper = pymysql_wrapper
189+ else :
190+ self .pymysql_wrapper = pymysql .connect
191+
186192 def close (self ):
187193 if self .__connected_stream :
188194 self ._stream_connection .close ()
@@ -198,7 +204,7 @@ def __connect_to_ctl(self):
198204 self ._ctl_connection_settings = dict (self .__connection_settings )
199205 self ._ctl_connection_settings ["db" ] = "information_schema"
200206 self ._ctl_connection_settings ["cursorclass" ] = DictCursor
201- self ._ctl_connection = pymysql . connect (** self ._ctl_connection_settings )
207+ self ._ctl_connection = self . pymysql_wrapper (** self ._ctl_connection_settings )
202208 self ._ctl_connection ._get_table_information = self .__get_table_information
203209 self .__connected_ctl = True
204210
@@ -236,7 +242,7 @@ def __connect_to_stream(self):
236242 # flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
237243 # server_id (4) -- server id of this slave
238244 # log_file (string.EOF) -- filename of the binlog on the master
239- self ._stream_connection = pymysql . connect (** self .__connection_settings )
245+ self ._stream_connection = self . pymysql_wrapper (** self .__connection_settings )
240246
241247 self .__use_checksum = self .__checksum_enabled ()
242248
0 commit comments