Skip to content

Commit 8c2fd85

Browse files
committed
current changes
1 parent 242a46d commit 8c2fd85

File tree

12 files changed

+151
-30
lines changed

12 files changed

+151
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
- Fixed events with dropped columns will return a proper columns amount
2424
- Changed configuration to static calls
2525
- Removed absolute method getConnection from repository
26+
- Added Heartbeat period and event support
2627

2728
## v2.2.0 (2017-03-10)
2829
- Removed foreign keys from events

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ Available options:
8888

8989
'custom' - if some params must be set in extended/implemented own classes
9090

91+
'heartbeatPeriod' - sets the interval in seconds between replication heartbeats. Whenever the master's binary log is updated with an event, the waiting period for the next heartbeat is reset. interval is a decimal value having the range 0 to 4294967 seconds and a resolution in milliseconds; the smallest nonzero value is 0.001. Heartbeats are sent by the master only if there are no unsent events in the binary log file for a period longer than interval.
92+
9193
Examples
9294
=========
9395

composer.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
"ext-sockets": "*",
1818
"doctrine/dbal": "^2.5",
1919
"doctrine/collections": "^1.3",
20-
"symfony/event-dispatcher": ">=2.8 ^3.1",
21-
"symfony/dependency-injection": ">=2.8 ^3.1",
20+
"symfony/event-dispatcher": "^2.8|^3.1",
21+
"symfony/dependency-injection": "^2.8|^3.1",
2222
"psr/simple-cache": "^1.0"
2323
},
2424
"require-dev": {
25-
"phpunit/phpunit": ">=4.8 ^5.7"
25+
"phpunit/phpunit": "^4.8|^5.7"
2626
},
2727
"license": "MIT",
2828
"authors": [

src/MySQLReplication/BinLog/BinLogSocketConnect.php

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ private function authenticate()
122122
for ($i = 0; $i < 23; $i++) {
123123
$data .= chr(0);
124124
}
125-
$result = sha1(Config::getPassword(), true) ^ sha1(BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true);
125+
$result = sha1(Config::getPassword(), true) ^ sha1(
126+
BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true
127+
);
126128

127129
$data = $data . Config::getUser() . chr(0) . chr(strlen($result)) . $result;
128130
$str = pack('L', strlen($data));
@@ -142,12 +144,22 @@ private function getBinlogStream()
142144
{
143145
$this->checkSum = $this->repository->isCheckSum();
144146
if ($this->checkSum) {
145-
$this->execute('SET @master_binlog_checksum=@@global.binlog_checksum');
147+
$this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
148+
}
149+
150+
if (0 !== Config::getHeartbeatPeriod()) {
151+
// master_heartbeat_period is nanoseconds
152+
$this->execute('SET @master_heartbeat_period = ' . Config::getHeartbeatPeriod() * 1000000000);
146153
}
147154

148155
$this->registerSlave();
149156

150-
if ('' !== Config::getGtid()) {
157+
if ('' !== Config::getMariaDbGtid()) {
158+
$this->execute('SET @mariadb_slave_capability = 4');
159+
$this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\'');
160+
$this->execute('SET @slave_gtid_strict_mode = 0');
161+
$this->execute('SET @slave_gtid_ignore_duplicates = 0');
162+
} else if ('' !== Config::getGtid()) {
151163
$this->setBinLogDumpGtid();
152164
} else {
153165
$this->setBinLogDump();
@@ -226,13 +238,6 @@ private function setBinLogDumpGtid()
226238
*/
227239
private function setBinLogDump()
228240
{
229-
if ('' !== Config::getMariaDbGtid()) {
230-
$this->execute('SET @mariadb_slave_capability = 4');
231-
$this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\'');
232-
$this->execute('SET @slave_gtid_strict_mode = 0');
233-
$this->execute('SET @slave_gtid_ignore_duplicates = 0');
234-
}
235-
236241
$binFilePos = Config::getBinLogPosition();
237242
$binFileName = Config::getBinLogFileName();
238243
if (0 === $binFilePos || '' === $binFileName) {

src/MySQLReplication/Config/Config.php

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ class Config
7272
* @var array
7373
*/
7474
private static $custom;
75+
/**
76+
* @var int
77+
*/
78+
private static $heartbeatPeriod;
7579

7680
/**
7781
* Config constructor.
@@ -91,6 +95,7 @@ class Config
9195
* @param array $databasesOnly
9296
* @param int $tableCacheSize
9397
* @param array $custom
98+
* @param int $heartbeatPeriod
9499
*/
95100
public function __construct(
96101
$user,
@@ -108,7 +113,8 @@ public function __construct(
108113
array $tablesOnly,
109114
array $databasesOnly,
110115
$tableCacheSize,
111-
array $custom
116+
array $custom,
117+
$heartbeatPeriod
112118
) {
113119
self::$user = $user;
114120
self::$host = $host;
@@ -126,73 +132,82 @@ public function __construct(
126132
self::$mariaDbGtid = $mariaGtid;
127133
self::$tableCacheSize = $tableCacheSize;
128134
self::$custom = $custom;
135+
self::$heartbeatPeriod = $heartbeatPeriod;
129136
}
130137

131138
/**
132139
* @throws ConfigException
133140
*/
134141
public static function validate()
135142
{
136-
if (!empty(self::$user) && false === is_string(self::$user)) {
143+
if (!empty(self::$user) && !is_string(self::$user)) {
137144
throw new ConfigException(ConfigException::USER_ERROR_MESSAGE, ConfigException::USER_ERROR_CODE);
138145
}
139146
if (!empty(self::$host)) {
140147
$ip = gethostbyname(self::$host);
141-
if (false === filter_var($ip, FILTER_VALIDATE_IP)) {
148+
if (!filter_var($ip, FILTER_VALIDATE_IP)) {
142149
throw new ConfigException(ConfigException::IP_ERROR_MESSAGE, ConfigException::IP_ERROR_CODE);
143150
}
144151
}
145-
if (!empty(self::$port) && false === filter_var(
152+
if (!empty(self::$port) && !filter_var(
146153
self::$port, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]]
147154
)
148155
) {
149156
throw new ConfigException(ConfigException::PORT_ERROR_MESSAGE, ConfigException::PORT_ERROR_CODE);
150157
}
151-
if (!empty(self::$password) && false === is_string(self::$password) && false === is_numeric(self::$password)) {
158+
if (!empty(self::$password) && !is_string(self::$password) && !is_numeric(self::$password)) {
152159
throw new ConfigException(ConfigException::PASSWORD_ERROR_MESSAGE, ConfigException::PASSWORD_ERROR_CODE);
153160
}
154-
if (!empty(self::$charset) && false === is_string(self::$charset)) {
161+
if (!empty(self::$charset) && !is_string(self::$charset)) {
155162
throw new ConfigException(ConfigException::CHARSET_ERROR_MESSAGE, ConfigException::CHARSET_ERROR_CODE);
156163
}
157-
if (!empty(self::$gtid) && false === is_string(self::$gtid)) {
164+
if (!empty(self::$gtid) && !is_string(self::$gtid)) {
158165
foreach (explode(',', self::$gtid) as $gtid) {
159-
if (false === (bool)preg_match(
160-
'/^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$/', $gtid, $matches
161-
)
166+
if (!(bool)preg_match(
167+
'/^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$/', $gtid, $matches
168+
)
162169
) {
163170
throw new ConfigException(ConfigException::GTID_ERROR_MESSAGE, ConfigException::GTID_ERROR_CODE);
164171
}
165172
}
166173
}
167-
if (!empty(self::$slaveId) && false === filter_var(
174+
if (!empty(self::$slaveId) && !filter_var(
168175
self::$slaveId, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]]
169176
)
170177
) {
171178
throw new ConfigException(ConfigException::SLAVE_ID_ERROR_MESSAGE, ConfigException::SLAVE_ID_ERROR_CODE);
172179
}
173-
if (!empty(self::$binLogFileName) && false === is_string(self::$binLogFileName)) {
180+
if (!empty(self::$binLogFileName) && !is_string(self::$binLogFileName)) {
174181
throw new ConfigException(
175182
ConfigException::BIN_LOG_FILE_NAME_ERROR_MESSAGE, ConfigException::BIN_LOG_FILE_NAME_ERROR_CODE
176183
);
177184
}
178-
if (!empty(self::$binLogPosition) && false === filter_var(
185+
if (!empty(self::$binLogPosition) && !filter_var(
179186
self::$binLogPosition, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]]
180187
)
181188
) {
182189
throw new ConfigException(
183190
ConfigException::BIN_LOG_FILE_POSITION_ERROR_MESSAGE, ConfigException::BIN_LOG_FILE_POSITION_ERROR_CODE
184191
);
185192
}
186-
if (!empty(self::$mariaDbGtid) && false === is_string(self::$mariaDbGtid)) {
193+
if (!empty(self::$mariaDbGtid) && !is_string(self::$mariaDbGtid)) {
187194
throw new ConfigException(
188195
ConfigException::MARIADBGTID_ERROR_MESSAGE, ConfigException::MARIADBGTID_ERROR_CODE
189196
);
190197
}
191-
if (false === filter_var(self::$tableCacheSize, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]])) {
198+
if (!filter_var(self::$tableCacheSize, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]])) {
192199
throw new ConfigException(
193200
ConfigException::TABLE_CACHE_SIZE_ERROR_MESSAGE, ConfigException::TABLE_CACHE_SIZE_ERROR_CODE
194201
);
195202
}
203+
if (0 !== self::$heartbeatPeriod && !filter_var(
204+
self::$heartbeatPeriod, FILTER_VALIDATE_INT, ['options' => ['min_range' => 1, 'max_range' => 4294967]]
205+
)
206+
) {
207+
throw new ConfigException(
208+
ConfigException::HEARTBEAT_PERIOD_ERROR_MESSAGE, ConfigException::HEARTBEAT_PERIOD_ERROR_CODE
209+
);
210+
}
196211
}
197212

198213
/**
@@ -357,4 +372,12 @@ public static function checkEvent($type)
357372

358373
return true;
359374
}
375+
376+
/**
377+
* @return int
378+
*/
379+
public static function getHeartbeatPeriod()
380+
{
381+
return self::$heartbeatPeriod;
382+
}
360383
}

src/MySQLReplication/Config/ConfigBuilder.php

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ class ConfigBuilder
7272
* @var array
7373
*/
7474
private $custom = [];
75+
/**
76+
* @var int
77+
*/
78+
private $heartbeatPeriod = 0;
7579

7680
/**
7781
* @param string $user
@@ -231,18 +235,36 @@ public function withMariaDbGtid($mariaDbGtid)
231235

232236
/**
233237
* @param int $tableCacheSize
238+
* @return $this
234239
*/
235240
public function withTableCacheSize($tableCacheSize)
236241
{
237242
$this->tableCacheSize = $tableCacheSize;
243+
244+
return $this;
238245
}
239246

240247
/**
241248
* @param array $custom
249+
* @return $this
242250
*/
243251
public function withCustom(array $custom)
244252
{
245253
$this->custom = $custom;
254+
255+
return $this;
256+
}
257+
258+
/**
259+
* @see https://dev.mysql.com/doc/refman/5.6/en/change-master-to.html
260+
* @param int $heartbeatPeriod
261+
* @return $this
262+
*/
263+
public function withHeartbeatPeriod($heartbeatPeriod)
264+
{
265+
$this->heartbeatPeriod = $heartbeatPeriod;
266+
267+
return $this;
246268
}
247269

248270
/**
@@ -266,7 +288,8 @@ public function build()
266288
$this->tablesOnly,
267289
$this->databasesOnly,
268290
$this->tableCacheSize,
269-
$this->custom
291+
$this->custom,
292+
$this->heartbeatPeriod
270293
);
271294
}
272295
}

src/MySQLReplication/Definitions/ConstEventType.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
/**
66
* Class ConstEventType
77
* @package MySQLReplication\Definitions
8-
* @see https://dev.mysql.com/doc/internals/en/binlog-event-type.html
8+
* @see https://dev.mysql.com/doc/internals/en/event-classes-and-types.html
99
*/
1010
class ConstEventType
1111
{

src/MySQLReplication/Definitions/ConstEventsNames.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ class ConstEventsNames
1818
const WRITE = 'write';
1919
const MARIADB_GTID = 'mariadb gtid';
2020
const FORMAT_DESCRIPTION = 'format description';
21+
const HEARTBEAT = 'heartbeat';
2122
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
namespace MySQLReplication\Event\DTO;
4+
5+
use MySQLReplication\Definitions\ConstEventsNames;
6+
7+
/**
8+
* Class HeartbeatDTO
9+
* @package MySQLReplication\Event\DTO
10+
*/
11+
class HeartbeatDTO extends EventDTO
12+
{
13+
/**
14+
* @var string
15+
*/
16+
protected $type = ConstEventsNames::HEARTBEAT;
17+
18+
/**
19+
* @return string
20+
*/
21+
public function getType()
22+
{
23+
return $this->type;
24+
}
25+
26+
/**
27+
* @return string
28+
*/
29+
public function __toString()
30+
{
31+
return PHP_EOL .
32+
'=== Event ' . $this->getType() . ' === ' . PHP_EOL .
33+
'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL .
34+
'Log position: ' . $this->eventInfo->getPos() . PHP_EOL .
35+
'Event size: ' . $this->eventInfo->getSize() . PHP_EOL;
36+
}
37+
38+
/**
39+
* Specify data which should be serialized to JSON
40+
* @link http://php.net/manual/en/jsonserializable.jsonserialize.php
41+
* @return mixed data which can be serialized by <b>json_encode</b>,
42+
* which is a value of any type other than a resource.
43+
* @since 5.4.0
44+
*/
45+
public function jsonSerialize()
46+
{
47+
return get_object_vars($this);
48+
}
49+
}

src/MySQLReplication/Event/Event.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use MySQLReplication\Definitions\ConstEventsNames;
1212
use MySQLReplication\Definitions\ConstEventType;
1313
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
14+
use MySQLReplication\Event\DTO\HeartbeatDTO;
1415
use MySQLReplication\Event\RowEvent\RowEventFactory;
1516
use MySQLReplication\Exception\MySQLReplicationException;
1617
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
@@ -152,6 +153,10 @@ public function consume()
152153
$this->eventDispatcher->dispatch(
153154
ConstEventsNames::FORMAT_DESCRIPTION, new FormatDescriptionEventDTO($eventInfo)
154155
);
156+
} elseif (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
157+
$this->eventDispatcher->dispatch(
158+
ConstEventsNames::HEARTBEAT, new HeartbeatDTO($eventInfo)
159+
);
155160
}
156161
}
157162
}

0 commit comments

Comments
 (0)