Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.

Commit 76db54d

Browse files
committed
backport
1 parent e26a355 commit 76db54d

25 files changed

+124
-128
lines changed

.travis.yml

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ sudo: false
33
language: php
44

55
php:
6-
- 7.1
7-
- 7.2
8-
- nightly
6+
- 5.6
7+
- 7.0
98

109
cache:
1110
directories:
@@ -21,7 +20,7 @@ script: ./vendor/bin/phpunit
2120

2221
jobs:
2322
allow_failures:
24-
- php: nightly
23+
- php: 7.0
2524

2625
include:
2726
- stage: Test
@@ -41,18 +40,6 @@ jobs:
4140
script:
4241
- ./vendor/bin/phpcs
4342

44-
- stage: Code Quality
45-
env: STATIC_ANALYSIS=1
46-
script:
47-
- ./vendor/bin/phpstan analyse -c phpstan.neon -l 1 src tests
48-
49-
- stage: Code Quality
50-
env: MUTATION_TESTING=1
51-
script:
52-
- mv ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini{.disabled,}
53-
- if [[ ! $(php -m | grep -si xdebug) ]]; then echo "xdebug required for coverage"; exit 1; fi
54-
- ./vendor/bin/infection --min-msi=49 --min-covered-msi=82 --threads=4
55-
5643
- stage: Functional tests
5744
services:
5845
- docker

src/Broker.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public function getSocket($host, $port, $modeSync)
174174
return $socket;
175175
}
176176

177-
private function judgeConnectionConfig()
177+
private function judgeConnectionConfig()
178178
{
179179
if ($this->config == null) {
180180
return null;
@@ -203,7 +203,7 @@ private function judgeConnectionConfig()
203203
return null;
204204
}
205205

206-
private function getSaslMechanismProvider()
206+
private function getSaslMechanismProvider()
207207
{
208208
$mechanism = $this->config->getSaslMechanism();
209209
$provider = null;

src/CommonSocket.php

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,47 +107,47 @@ public function __construct($host, $port, $config = null, $saslProvider = null)
107107
$this->port = $port;
108108
$this->config = $config;
109109
$this->saslMechanismProvider = $saslProvider;
110-
$this->loop = Loop::getInstance();
110+
$this->loop = Loop::getInstance();
111111
}
112112

113113
/**
114-
* @param float $sendTimeoutSec
114+
* @param $sendTimeoutSec
115115
*/
116-
public function setSendTimeoutSec($sendTimeoutSec)
116+
public function setSendTimeoutSec($sendTimeoutSec)
117117
{
118118
$this->sendTimeoutSec = $sendTimeoutSec;
119119
}
120120

121121
/**
122-
* @param float $sendTimeoutUsec
122+
* @param $sendTimeoutUsec
123123
*/
124-
public function setSendTimeoutUsec(float $sendTimeoutUsec)
124+
public function setSendTimeoutUsec($sendTimeoutUsec)
125125
{
126126
$this->sendTimeoutUsec = $sendTimeoutUsec;
127127
}
128128

129129
/**
130-
* @param float $recvTimeoutSec
130+
* @param $recvTimeoutSec
131131
*/
132-
public function setRecvTimeoutSec(float $recvTimeoutSec)
132+
public function setRecvTimeoutSec($recvTimeoutSec)
133133
{
134134
$this->recvTimeoutSec = $recvTimeoutSec;
135135
}
136136

137137

138138
/**
139-
* @param float $recvTimeoutUsec
139+
* @param $recvTimeoutUsec
140140
*/
141-
public function setRecvTimeoutUsec(float $recvTimeoutUsec)
141+
public function setRecvTimeoutUsec($recvTimeoutUsec)
142142
{
143143
$this->recvTimeoutUsec = $recvTimeoutUsec;
144144
}
145145

146146

147147
/**
148-
* @param int $number
148+
* @param $number
149149
*/
150-
public function setMaxWriteAttempts(int $number)
150+
public function setMaxWriteAttempts($number)
151151
{
152152
$this->maxWriteAttempts = $number;
153153
}
@@ -158,7 +158,7 @@ public function setMaxWriteAttempts(int $number)
158158
* @access public
159159
* @return void
160160
*/
161-
protected function createStream()
161+
protected function createStream()
162162
{
163163
if (empty($this->host)) {
164164
throw new \Kafka\Exception('Cannot open null host.');
@@ -321,7 +321,7 @@ public function readBlocking($len)
321321
* @return integer
322322
* @throws \Kafka\Exception
323323
*/
324-
public function writeBlocking($buf)
324+
public function writeBlocking($buf)
325325
{
326326
$write = [$this->stream];
327327

@@ -377,5 +377,5 @@ public function writeBlocking($buf)
377377
* @access public
378378
* @return void
379379
*/
380-
abstract public function close() ;
380+
abstract public function close();
381381
}

src/Config.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public function setBrokerVersion($version)
9494
{
9595
$version = trim($version);
9696
if ($version == '' || version_compare($version, '0.8.0') < 0) {
97-
throw new \Kafka\Exception\Config('Set broker version value is invalid, must is not empty string and gt 0.8.0.');
97+
throw new \Kafka\Exception\Config('Set broker version value is invalid, must is not empty and gt 0.8.0.');
9898
}
9999
static::$options['brokerVersion'] = $version;
100100
}
@@ -169,31 +169,31 @@ public function setMetadataMaxAgeMs($metadataMaxAgeMs)
169169
static::$options['metadataMaxAgeMs'] = $metadataMaxAgeMs;
170170
}
171171

172-
public function setSslLocalCert(string $localCert)
172+
public function setSslLocalCert($localCert)
173173
{
174174
if (! file_exists($localCert) || ! is_file($localCert)) {
175175
throw new \Kafka\Exception\Config('Set ssl local cert file is invalid');
176176
}
177177
static::$options['sslLocalCert'] = $localCert;
178178
}
179179

180-
public function setSslLocalPk(string $localPk)
180+
public function setSslLocalPk($localPk)
181181
{
182182
if (! file_exists($localPk) || ! is_file($localPk)) {
183183
throw new \Kafka\Exception\Config('Set ssl local private key file is invalid');
184184
}
185185
static::$options['sslLocalPk'] = $localPk;
186186
}
187187

188-
public function setSslCafile(string $cafile)
188+
public function setSslCafile($cafile)
189189
{
190190
if (! file_exists($cafile) || ! is_file($cafile)) {
191191
throw new \Kafka\Exception\Config('Set ssl ca file is invalid');
192192
}
193193
static::$options['sslCafile'] = $cafile;
194194
}
195195

196-
public function setSaslKeytab(string $keytab)
196+
public function setSaslKeytab($keytab)
197197
{
198198
if (! file_exists($keytab) || ! is_file($keytab)) {
199199
throw new \Kafka\Exception\Config('Set sasl gssapi keytab file is invalid');

src/Consumer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class Consumer
2525
public function __construct($stopStrategy = null)
2626
{
2727
$this->stopStrategy = $stopStrategy;
28-
$this->loop = Loop::getInstance();
28+
$this->loop = Loop::getInstance();
2929
}
3030

3131
/**

src/Consumer/State.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ class State
4040

4141
private $requests = self::CLEAN_REQUEST_STATE;
4242

43-
private $loop = null;
43+
private $loop = null;
4444

4545
public function init()
4646
{
47-
$this->loop = Loop::getInstance();
47+
$this->loop = Loop::getInstance();
4848
$this->callStatus = [
4949
self::REQUEST_METADATA => ['status' => self::STATUS_LOOP],
5050
self::REQUEST_GETGROUP => ['status' => self::STATUS_START],

src/Consumer/StopStrategy/Callback.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ final class Callback implements StopStrategy
2424
private $interval;
2525
private $loop;
2626

27-
public function __construct(callable $callback, int $interval = self::DEFAULT_INTERVAL)
27+
public function __construct($callback, $interval = self::DEFAULT_INTERVAL)
2828
{
2929
$this->callback = $callback;
3030
$this->interval = $interval;
31-
$this->loop = Loop::getInstance();
31+
$this->loop = Loop::getInstance();
3232
}
3333

3434
public function setup(Consumer $consumer)
3535
{
3636
$this->loop->repeat(
3737
$this->interval,
38-
function (string $watcherId) use ($consumer) {
38+
function ($watcherId) use ($consumer) {
3939
$shouldStop = (bool) call_user_func($this->callback);
4040

4141
if (! $shouldStop) {

src/Consumer/StopStrategy/Delay.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ final class Delay implements StopStrategy
1515
private $delay;
1616
private $loop;
1717

18-
public function __construct(int $delay)
18+
public function __construct($delay)
1919
{
2020
$this->delay = $delay;
21-
$this->loop = Loop::getInstance();
21+
$this->loop = Loop::getInstance();
2222
}
2323

2424
public function setup(Consumer $consumer)

src/Loop.php

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,92 @@
11
<?php
22
namespace Kafka;
33

4-
use Amp as AmpLoop;
5-
64
class Loop
75
{
8-
use \Kafka\SingletonTrait;
6+
use \Kafka\SingletonTrait;
97

108
private $watchers = [];
119

12-
public function repeat(int $interval, callable $callback)
10+
public function repeat($interval, callable $callback)
1311
{
14-
$watcherId = AmpLoop::repeat($interval, $callback);
12+
$watcherId = \Amp\repeat($callback, $interval);
1513
$this->addWatcher($watcherId);
1614
return $watcherId;
1715
}
1816

19-
public function defer(callable $callback)
17+
public function defer(callable $callback)
2018
{
21-
$watcherId = AmpLoop::defer($callback);
19+
$watcherId = \Amp\immediately($callback);
2220
$this->addWatcher($watcherId);
2321
return $watcherId;
2422
}
2523

26-
public function cancel(string $watcherId)
24+
public function cancel($watcherId)
2725
{
28-
AmpLoop::cancel($watcherId);
26+
\Amp\cancel($watcherId);
2927
$this->delWatcher($watcherId);
3028
}
3129

32-
public function disable(string $watcherId)
30+
public function disable($watcherId)
3331
{
34-
AmpLoop::disable($watcherId);
32+
\Amp\disable($watcherId);
3533
}
3634

37-
public function enable(string $watcherId)
35+
public function enable($watcherId)
3836
{
39-
AmpLoop::enable($watcherId);
37+
\Amp\enable($watcherId);
4038
}
4139

42-
public function delay(int $delay, callable $callback, $data = null)
40+
public function delay($delay, callable $callback, $data = [])
4341
{
44-
$watcherId = AmpLoop::delay($delay, $callback, $data);
42+
$watcherId = \Amp\once($callback, $delay, $data);
4543
$this->addWatcher($watcherId);
4644
return $watcherId;
4745
}
4846

49-
public function onReadable($stream, callable $callback, $data = null)
47+
public function onReadable($stream, callable $callback, $data = [])
5048
{
51-
$watcherId = AmpLoop::onReadable($stream, $callback, $data);
49+
$watcherId = \Amp\onReadable($stream, $callback, $data);
5250
$this->addWatcher($watcherId);
5351
return $watcherId;
5452
}
5553

56-
public function onWritable($stream, callable $callback, $data = null)
54+
public function onWritable($stream, callable $callback, $data = [])
5755
{
58-
$watcherId = AmpLoop::onWritable($stream, $callback, $data);
56+
$watcherId = \Amp\onWritable($stream, $callback, $data);
5957
$this->addWatcher($watcherId);
6058
return $watcherId;
6159
}
6260

63-
public function run()
61+
public function run()
6462
{
65-
$info = AmpLoop::getInfo();
63+
$info = \Amp\info();
6664
if (isset($info['running']) && $info['running'] === true) {
6765
return;
6866
}
6967

70-
AmpLoop::run();
68+
\Amp\run();
7169
}
7270

73-
public function stop()
71+
public function stop()
7472
{
75-
AmpLoop::stop();
73+
\Amp\stop();
7674
foreach ($this->watchers as $watcherId => $unused) {
7775
$this->cancel($watcherId);
7876
}
7977
}
8078

81-
public function getInfo()
79+
public function getInfo()
8280
{
83-
return AmpLoop::getInfo();
81+
return \Amp\info();
8482
}
8583

86-
private function addWatcher(string $watcherId)
84+
private function addWatcher($watcherId)
8785
{
8886
$this->watchers[$watcherId] = true;
8987
}
9088

91-
private function delWatcher(string $watcherId)
89+
private function delWatcher($watcherId)
9290
{
9391
if (! isset($this->watchers[$watcherId])) {
9492
return;

src/Producer/State.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class State
2020

2121
private $callStatus = [];
2222

23-
private $loop = null;
23+
private $loop = null;
2424

2525
private $requests = [
2626
self::REQUEST_METADATA => [],
@@ -29,7 +29,7 @@ class State
2929

3030
public function init()
3131
{
32-
$this->loop = Loop::getInstance();
32+
$this->loop = Loop::getInstance();
3333
$this->callStatus = [
3434
self::REQUEST_METADATA => [
3535
'status'=> self::STATUS_LOOP,

0 commit comments

Comments
 (0)