From dae7a4d99d7eecefbe8a5b37160ebc1388ef9019 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 2 Feb 2018 20:16:13 +0800 Subject: [PATCH 1/5] Wrong add 1 to offset --- src/Consumer/Process.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index 1687bea7..d39119f0 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -550,7 +550,7 @@ public function succFetchOffset(array $result): void foreach ($consumerOffsets as $topic => $value) { foreach ($value as $partId => $offset) { if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) { - $consumerOffsets[$topic][$partId] = $offset + 1; + $consumerOffsets[$topic][$partId] = $offset; } } } From e74fd00087d45b1d28ac52e91fed99d7a932df40 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 2 Feb 2018 20:52:01 +0800 Subject: [PATCH 2/5] commit the offset which should be consume next time --- src/Consumer/Process.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index d39119f0..7a64d67f 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -647,7 +647,7 @@ public function succFetch(array $result, int $fd): void $offset = $message['offset']; } - $consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset; + $consumerOffset = ($offset + 1); $assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset); $assign->setCommitOffset($topic['topicName'], $part['partition'], $offset); } From 6c83884b591ab1c9fc236840b0acf87f92c8bf99 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 2 Feb 2018 22:01:32 +0800 Subject: [PATCH 3/5] make a try --- src/Consumer/Process.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index 7a64d67f..f0734566 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -550,7 +550,7 @@ public function succFetchOffset(array $result): void foreach ($consumerOffsets as $topic => $value) { foreach ($value as $partId => $offset) { if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) { - $consumerOffsets[$topic][$partId] = $offset; + $consumerOffsets[$topic][$partId] = $offset + 1; } } } From 43f5da5b3b07ea34405f1c470ad2b84eea379bd7 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 2 Feb 2018 22:12:15 +0800 Subject: [PATCH 4/5] Revert "make a try" This reverts commit 6c83884b591ab1c9fc236840b0acf87f92c8bf99. --- src/Consumer/Process.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index f0734566..7a64d67f 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -550,7 +550,7 @@ public function succFetchOffset(array $result): void foreach ($consumerOffsets as $topic => $value) { foreach ($value as $partId => $offset) { if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) { - $consumerOffsets[$topic][$partId] = $offset + 1; + $consumerOffsets[$topic][$partId] = $offset; } } } From b327fdaf67054f8965712fa0cfc20597d78ea683 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Fri, 2 Feb 2018 22:12:58 +0800 Subject: [PATCH 5/5] Make a try here --- src/Consumer/Process.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index 7a64d67f..48b2fff0 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -644,10 +644,10 @@ public function succFetch(array $result, int $fd): void foreach ($part['messages'] as $message) { $this->messages[$topic['topicName']][$part['partition']][] = $message; - $offset = $message['offset']; + $offset = $message['offset'] + 1; } - $consumerOffset = ($offset + 1); + $consumerOffset = $offset; $assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset); $assign->setCommitOffset($topic['topicName'], $part['partition'], $offset); }