diff --git a/composer.json b/composer.json index a0d44015..1204b88e 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ "sort-packages": true }, "require": { - "php": "^5.6", + "php": ">=5.6", "amphp/amp": "^v1.2.2", "psr/log": "^1.0.2" }, diff --git a/src/Broker.php b/src/Broker.php index d3ddc7aa..f3184043 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -7,8 +7,6 @@ class Broker { - use SingletonTrait; - private $groupBrokerId = null; private $topics = []; @@ -23,6 +21,22 @@ class Broker private $config; + private static $instance = []; + + /** + * @return static + */ + public static function getInstance($instance_name = 'default') + { + + if(isset(self::$instance[$instance_name] )) { + return self::$instance[$instance_name]; + }else{ + return self::$instance[$instance_name] = new static(); + } + } + + public function setProcess(callable $process) { $this->process = $process; diff --git a/src/CommonSocket.php b/src/CommonSocket.php index 31f75a08..021d272d 100644 --- a/src/CommonSocket.php +++ b/src/CommonSocket.php @@ -3,6 +3,8 @@ abstract class CommonSocket { + use LoggerTrait; + use \Psr\Log\LoggerAwareTrait; const READ_MAX_LENGTH = 5242880; // read socket max length 5MB @@ -273,23 +275,28 @@ public function readBlocking($len) throw new \Kafka\Exception('Invalid length given, it should be lesser than or equals to ' . self:: READ_MAX_LENGTH); } + //http://php.net/manpual/en/function.stream-select.ph + + try_again: + $null = null; - $read = [$this->stream]; - $readable = $this->select($read, $this->recvTimeoutSec, $this->recvTimeoutUsec); - if ($readable === false) { - $this->close(); - throw new \Kafka\Exception('Could not read ' . $len . ' bytes from stream (not readable)'); + if($this->isSocketDead()) { + $res = $this->getMetaData(); + $this->debug(json_encode($res)); + $this->debug('socket dead, reconnecting...'); + $this->reconnect(); } - if ($readable === 0) { // select timeout + $read = [$this->stream]; + $readable = $this->select($read, null, $this->recvTimeoutUsec); + + if ($readable == false) { $res = $this->getMetaData(); - $this->close(); - if (! empty($res['timed_out'])) { - throw new \Kafka\Exception('Timed out reading ' . $len . ' bytes from stream'); - } else { - throw new \Kafka\Exception('Could not read ' . $len . ' bytes from stream (not readable)'); - } + $this->debug(json_encode($res)); + $this->debug('select read socket failed try_again'); + goto try_again; } + $remainingBytes = $len; $data = $chunk = ''; while ($remainingBytes > 0) { @@ -378,4 +385,7 @@ public function writeBlocking($buf) * @return void */ abstract public function close(); + + abstract protected function isSocketDead(); + abstract protected function reconnect(); } diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index ccc81d98..169817a5 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -490,7 +490,7 @@ public function succFetchOffset($result) foreach ($consumerOffsets as $topic => $value) { foreach ($value as $partId => $offset) { if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) { - $consumerOffsets[$topic][$partId] = $offset + 1; + $consumerOffsets[$topic][$partId] = $offset; } } } @@ -557,8 +557,8 @@ public function succFetch($result, $fd) continue; } - $consumerOffset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); - if ($consumerOffset === false) { + $offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); + if ($offset === false) { return; // current is rejoin.... } foreach ($part['messages'] as $message) { @@ -566,14 +566,11 @@ public function succFetch($result, $fd) //if ($this->consumer != null) { // call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message); //} - $commitOffset = $message['offset']; + $offset = $message['offset'] + 1; } - $commitOffset = isset($commitOffset) ? $commitOffset : $consumerOffset - 1; - $consumerOffset = $commitOffset + 1; - - $assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset); - $assign->setCommitOffset($topic['topicName'], $part['partition'], $commitOffset); + $assign->setConsumerOffset($topic['topicName'], $part['partition'], $offset); + $assign->setCommitOffset($topic['topicName'], $part['partition'], $offset); } } $this->state->succRun(\Kafka\Consumer\State::REQUEST_FETCH, $fd); diff --git a/src/ConsumerConfig.php b/src/ConsumerConfig.php index 38b19223..1eb6e1c5 100644 --- a/src/ConsumerConfig.php +++ b/src/ConsumerConfig.php @@ -4,6 +4,11 @@ /** * @method string|false ietGroupId() * @method array|false ietTopics() + * @method setMaxBytes + * @method setMaxWaitTime + * @method getMaxWaitTime + * @method getMaxBytes + * @method getOffsetReset */ class ConsumerConfig extends Config { diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 4ad5314a..34eb0440 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -34,7 +34,7 @@ public function init() \Kafka\Protocol::init($config->getBrokerVersion(), $this->logger); // init process request - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setConfig($config); $broker->setProcess(function ($data, $fd) { $this->processRequest($data, $fd); @@ -124,7 +124,7 @@ public function syncMeta() } shuffle($brokerHost); - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); foreach ($brokerHost as $host) { $socket = $broker->getMetaConnect($host); if ($socket) { @@ -160,7 +160,7 @@ protected function processRequest($data, $fd) $this->error('Get metadata is fail, brokers or topics is null.'); $this->state->failRun(\Kafka\Producer\State::REQUEST_METADATA); } else { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $isChange = $broker->setData($result['topics'], $result['brokers']); $this->state->succRun(\Kafka\Producer\State::REQUEST_METADATA, $isChange); } @@ -177,7 +177,7 @@ protected function processRequest($data, $fd) protected function produce() { $context = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $requiredAck = \Kafka\ProducerConfig::getInstance()->getRequiredAck(); $timeout = \Kafka\ProducerConfig::getInstance()->getTimeout(); @@ -260,7 +260,7 @@ protected function stateConvert($errorCode, $context = null) protected function convertMessage($data) { $sendData = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $topicInfos = $broker->getTopics(); foreach ($data as $value) { if (! isset($value['topic']) || ! trim($value['topic'])) { diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index d09096f9..182a256d 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -12,7 +12,7 @@ public function __construct() $config = \Kafka\ProducerConfig::getInstance(); \Kafka\Protocol::init($config->getBrokerVersion(), $this->logger); // init broker - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setConfig($config); $this->syncMeta(); @@ -20,7 +20,7 @@ public function __construct() public function send($data) { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $requiredAck = \Kafka\ProducerConfig::getInstance()->getRequiredAck(); $timeout = \Kafka\ProducerConfig::getInstance()->getTimeout(); @@ -80,7 +80,7 @@ public function syncMeta() } shuffle($brokerHost); - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); foreach ($brokerHost as $host) { $socket = $broker->getMetaConnect($host, true); if ($socket) { @@ -95,7 +95,7 @@ public function syncMeta() if (! isset($result['brokers']) || ! isset($result['topics'])) { throw new \Kafka\Exception('Get metadata is fail, brokers or topics is null.'); } else { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $broker->setData($result['topics'], $result['brokers']); } return; @@ -113,7 +113,7 @@ public function syncMeta() protected function convertMessage($data) { $sendData = []; - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(__CLASS__); $topicInfos = $broker->getTopics(); foreach ($data as $value) { if (! isset($value['topic']) || ! trim($value['topic'])) { diff --git a/src/SocketSync.php b/src/SocketSync.php index 42f5b503..f9098b4c 100644 --- a/src/SocketSync.php +++ b/src/SocketSync.php @@ -100,4 +100,27 @@ public function rewind() rewind($this->stream); } } + + /** + * reconnect the socket + * + * @access public + * @return void + */ + function reconnect() + { + $this->close(); + $this->connect(); + } + + + /** + * check the stream is close + * + * @return bool + */ + function isSocketDead() + { + return ! is_resource($this->stream) || @feof($this->stream); + } }