diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index 1687bea7..48b2fff0 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -550,7 +550,7 @@ public function succFetchOffset(array $result): void foreach ($consumerOffsets as $topic => $value) { foreach ($value as $partId => $offset) { if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) { - $consumerOffsets[$topic][$partId] = $offset + 1; + $consumerOffsets[$topic][$partId] = $offset; } } } @@ -644,10 +644,10 @@ public function succFetch(array $result, int $fd): void foreach ($part['messages'] as $message) { $this->messages[$topic['topicName']][$part['partition']][] = $message; - $offset = $message['offset']; + $offset = $message['offset'] + 1; } - $consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset; + $consumerOffset = $offset; $assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset); $assign->setCommitOffset($topic['topicName'], $part['partition'], $offset); }