44use MySQLReplication \BinaryDataReader \BinaryDataReader ;
55use MySQLReplication \BinLog \Exception \BinLogException ;
66use MySQLReplication \Config \Config ;
7- use MySQLReplication \Repository \MySQLRepository ;
87use MySQLReplication \Definitions \ConstCapabilityFlags ;
98use MySQLReplication \Definitions \ConstCommand ;
10- use MySQLReplication \Gtid \GtidCollection ;
9+ use MySQLReplication \Gtid \GtidService ;
10+ use MySQLReplication \Repository \MySQLRepository ;
1111
1212/**
1313 * Class BinLogConnect
@@ -35,10 +35,6 @@ class BinLogConnect
3535 * @var BinLogAuth
3636 */
3737 private $ packAuth ;
38- /**
39- * @var GtidCollection
40- */
41- private $ gtidCollection ;
4238 /**
4339 * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
4440 * @var array
@@ -49,24 +45,21 @@ class BinLogConnect
4945 * @param Config $config
5046 * @param MySQLRepository $mySQLRepository
5147 * @param BinLogAuth $packAuth
52- * @param GtidCollection $gtidCollection
48+ * @param GtidService $gtidService
5349 */
5450 public function __construct (
5551 Config $ config ,
5652 MySQLRepository $ mySQLRepository ,
5753 BinLogAuth $ packAuth ,
58- GtidCollection $ gtidCollection
54+ GtidService $ gtidService
5955 ) {
6056 $ this ->mySQLRepository = $ mySQLRepository ;
6157 $ this ->config = $ config ;
6258 $ this ->packAuth = $ packAuth ;
63- $ this ->gtidCollection = $ gtidCollection ;
59+ $ this ->gtidService = $ gtidService ;
6460 }
6561
66- /**
67- *
68- */
69- public function __destruct ()
62+ public function disconnect ()
7063 {
7164 if (true === $ this ->isConnected ())
7265 {
@@ -91,7 +84,6 @@ public function getCheckSum()
9184 return $ this ->checkSum ;
9285 }
9386
94-
9587 /**
9688 * @throws BinLogException
9789 */
@@ -141,9 +133,32 @@ public function getPacket($checkForOkByte = true)
141133 {
142134 $ this ->isWriteSuccessful ($ result );
143135 }
136+
144137 return $ result ;
145138 }
146139
140+ /**
141+ * @param $length
142+ * @return mixed
143+ * @throws BinLogException
144+ */
145+ private function readFromSocket ($ length )
146+ {
147+ $ received = socket_recv ($ this ->socket , $ buf , $ length , MSG_WAITALL );
148+ if ($ length === $ received )
149+ {
150+ return $ buf ;
151+ }
152+
153+ // http://php.net/manual/pl/function.socket-recv.php#47182
154+ if (0 === $ received )
155+ {
156+ throw new BinLogException ('Disconnected by remote side ' );
157+ }
158+
159+ throw new BinLogException (socket_strerror (socket_last_error ()), socket_last_error ());
160+ }
161+
147162 /**
148163 * @param $packet
149164 * @return array
@@ -170,28 +185,6 @@ public function isWriteSuccessful($packet)
170185 }
171186 }
172187
173- /**
174- * @param $length
175- * @return mixed
176- * @throws BinLogException
177- */
178- private function readFromSocket ($ length )
179- {
180- $ received = socket_recv ($ this ->socket , $ buf , $ length , MSG_WAITALL );
181- if ($ length === $ received )
182- {
183- return $ buf ;
184- }
185-
186- // http://php.net/manual/pl/function.socket-recv.php#47182
187- if (0 === $ received )
188- {
189- throw new BinLogException ('Disconnected by remote side ' );
190- }
191-
192- throw new BinLogException (socket_strerror (socket_last_error ()), socket_last_error ());
193- }
194-
195188 /**
196189 * @throws BinLogException
197190 */
@@ -231,49 +224,16 @@ private function getBinlogStream()
231224 $ this ->execute ('SET @master_binlog_checksum=@@global.binlog_checksum ' );
232225 }
233226
234- if (0 === $ this ->gtidCollection ->count ())
235- {
236- $ binFilePos = $ this ->config ->getBinLogPosition ();
237- $ binFileName = $ this ->config ->getBinLogFileName ();
227+ $ this ->registerSlave ();
238228
239- if ('' !== $ this ->config ->getMariaDbGtid ())
240- {
241- $ this ->execute ("SET @mariadb_slave_capability = 4 " );
242- $ this ->execute ("SET @slave_connect_state = ' " . $ this ->config ->getMariaDbGtid () . "' " );
243- $ this ->execute ("SET @slave_gtid_strict_mode = 0 " );
244- $ this ->execute ("SET @slave_gtid_ignore_duplicates = 0 " );
245- }
246-
247- if ('' === $ binFilePos || '' === $ binFileName )
248- {
249- $ master = $ this ->mySQLRepository ->getMasterStatus ();
250- $ binFilePos = $ master ['Position ' ];
251- $ binFileName = $ master ['File ' ];
252- }
253-
254- $ prelude = pack ('i ' , strlen ($ binFileName ) + 11 ) . chr (ConstCommand::COM_BINLOG_DUMP );
255- $ prelude .= pack ('I ' , $ binFilePos );
256- $ prelude .= pack ('v ' , 0 );
257- $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
258- $ prelude .= $ binFileName ;
229+ if ('' !== $ this ->config ->getGtid ())
230+ {
231+ $ this ->setBinLogDumpGtid ();
259232 }
260233 else
261234 {
262- $ prelude = pack ('l ' , 26 + $ this ->gtidCollection ->getEncodedLength ()) . chr (ConstCommand::COM_BINLOG_DUMP_GTID );
263- $ prelude .= pack ('S ' , 0 );
264- $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
265- $ prelude .= pack ('I ' , 3 );
266- $ prelude .= chr (0 );
267- $ prelude .= chr (0 );
268- $ prelude .= chr (0 );
269- $ prelude .= BinaryDataReader::pack64bit (4 );
270-
271- $ prelude .= pack ('I ' , $ this ->gtidCollection ->getEncodedLength ());
272- $ prelude .= $ this ->gtidCollection ->getEncoded ();
235+ $ this ->setBinLogDump ();
273236 }
274-
275- $ this ->writeToSocket ($ prelude );
276- $ this ->getPacket ();
277237 }
278238
279239 /**
@@ -288,4 +248,81 @@ private function execute($sql)
288248 $ this ->writeToSocket ($ prelude . $ sql );
289249 $ this ->getPacket ();
290250 }
251+
252+ /**
253+ * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
254+ * @throws BinLogException
255+ */
256+ private function registerSlave ()
257+ {
258+ $ prelude = pack ('l ' , 18 ) . chr (ConstCommand::COM_REGISTER_SLAVE );
259+ $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
260+ $ prelude .= chr (0 );
261+ $ prelude .= chr (0 );
262+ $ prelude .= chr (0 );
263+ $ prelude .= pack ('s ' , '' );
264+ $ prelude .= pack ('I ' , 0 );
265+ $ prelude .= pack ('I ' , 0 );
266+
267+ $ this ->writeToSocket ($ prelude );
268+ $ this ->getPacket ();
269+ }
270+
271+ /**
272+ * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
273+ * @throws BinLogException
274+ */
275+ private function setBinLogDumpGtid ()
276+ {
277+ $ collection = $ this ->gtidService ->makeCollectionFromString ($ this ->config ->getGtid ());
278+
279+ $ prelude = pack ('l ' , 26 + $ collection ->getEncodedLength ()) . chr (ConstCommand::COM_BINLOG_DUMP_GTID );
280+ $ prelude .= pack ('S ' , 0 );
281+ $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
282+ $ prelude .= pack ('I ' , 3 );
283+ $ prelude .= chr (0 );
284+ $ prelude .= chr (0 );
285+ $ prelude .= chr (0 );
286+ $ prelude .= BinaryDataReader::pack64bit (4 );
287+
288+ $ prelude .= pack ('I ' , $ collection ->getEncodedLength ());
289+ $ prelude .= $ collection ->getEncoded ();
290+
291+ $ this ->writeToSocket ($ prelude );
292+ $ this ->getPacket ();
293+ }
294+
295+ /**
296+ * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
297+ * @throws BinLogException
298+ */
299+ private function setBinLogDump ()
300+ {
301+ $ binFilePos = $ this ->config ->getBinLogPosition ();
302+ $ binFileName = $ this ->config ->getBinLogFileName ();
303+
304+ if ('' !== $ this ->config ->getMariaDbGtid ())
305+ {
306+ $ this ->execute ('SET @mariadb_slave_capability = 4 ' );
307+ $ this ->execute ('SET @slave_connect_state = \'' . $ this ->config ->getMariaDbGtid () . '\'' );
308+ $ this ->execute ('SET @slave_gtid_strict_mode = 0 ' );
309+ $ this ->execute ('SET @slave_gtid_ignore_duplicates = 0 ' );
310+ }
311+
312+ if ('' === $ binFilePos || '' === $ binFileName )
313+ {
314+ $ master = $ this ->mySQLRepository ->getMasterStatus ();
315+ $ binFilePos = $ master ['Position ' ];
316+ $ binFileName = $ master ['File ' ];
317+ }
318+
319+ $ prelude = pack ('i ' , strlen ($ binFileName ) + 11 ) . chr (ConstCommand::COM_BINLOG_DUMP );
320+ $ prelude .= pack ('I ' , $ binFilePos );
321+ $ prelude .= pack ('v ' , 0 );
322+ $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
323+ $ prelude .= $ binFileName ;
324+
325+ $ this ->writeToSocket ($ prelude );
326+ $ this ->getPacket ();
327+ }
291328}
0 commit comments