From d8dbea274a0e7667050335d2aafe9142116dc9ea Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Sat, 5 Oct 2024 11:41:40 +0200 Subject: [PATCH 1/2] Propagate timeout to _retrieve_offsets The default timeout of _retrieve_offsets is infinite, this makes the Consumer block indefinitely even if poll was called with a timeout. Propagate the timeout from the Consumer to the Fetcher operation, removing some of the timeout as more and more sub-operation consume the total allowed timeout. During a `poll` operation, the timeout for positions and offsets is kept separate because poll will always keep running and accumulate messages until the end of the timeout. We want to be able to poll messages for a short time (or for zero time, the default) while allowing some delay when retrieving the positions. --- kafka/consumer/fetcher.py | 16 +++++++++------- kafka/consumer/group.py | 23 +++++++++++++---------- test/test_fetcher.py | 9 +++++---- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7ff9daf7b..f5ed6f8bf 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -131,17 +131,18 @@ def send_fetches(self): self._clean_done_fetch_futures() return futures - def reset_offsets_if_needed(self, partitions): + def reset_offsets_if_needed(self, partitions, timeout_ms=float("inf")): """Lookup and set offsets for any partitions which are awaiting an explicit reset. Arguments: partitions (set of TopicPartitions): the partitions to reset """ + end_time = time.time() + timeout_ms / 1000 for tp in partitions: # TODO: If there are several offsets to reset, we could submit offset requests in parallel if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp): - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time()))) def _clean_done_fetch_futures(self): while True: @@ -156,7 +157,7 @@ def in_flight_fetches(self): self._clean_done_fetch_futures() return bool(self._fetch_futures) - def update_fetch_positions(self, partitions): + def update_fetch_positions(self, partitions, timeout_ms=float("inf")): """Update the fetch positions for the provided partitions. Arguments: @@ -167,6 +168,7 @@ def update_fetch_positions(self, partitions): partition and no reset policy is available """ # reset the fetch position to the committed position + end_time = time.time() + timeout_ms / 1000 for tp in partitions: if not self._subscriptions.is_assigned(tp): log.warning("partition %s is not assigned - skipping offset" @@ -178,12 +180,12 @@ def update_fetch_positions(self, partitions): continue if self._subscriptions.is_offset_reset_needed(tp): - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time()))) elif self._subscriptions.assignment[tp].committed is None: # there's no committed position, so we need to reset with the # default strategy self._subscriptions.need_offset_reset(tp) - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time()))) else: committed = self._subscriptions.assignment[tp].committed.offset log.debug("Resetting offset for partition %s to the committed" @@ -215,7 +217,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): offsets[tp] = offsets[tp][0] return offsets - def _reset_offset(self, partition): + def _reset_offset(self, partition, timeout_ms): """Reset offsets for the given partition using the offset reset strategy. Arguments: @@ -234,7 +236,7 @@ def _reset_offset(self, partition): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offsets = self._retrieve_offsets({partition: timestamp}) + offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms) if partition in offsets: offset = offsets[partition][0] diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 969969932..72546788a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -615,7 +615,7 @@ def partitions_for_topic(self, topic): partitions = cluster.partitions_for_topic(topic) return partitions - def poll(self, timeout_ms=0, max_records=None, update_offsets=True): + def poll(self, timeout_ms=0, max_records=None, update_offsets=True, *, positions_timeout_ms=float("inf")): """Fetch data from assigned topics / partitions. Records are fetched and returned in batches by topic-partition. @@ -656,7 +656,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True): start = time.time() remaining = timeout_ms while not self._closed: - records = self._poll_once(remaining, max_records, update_offsets=update_offsets) + records = self._poll_once(remaining, positions_timeout_ms, max_records, update_offsets=update_offsets) if records: return records @@ -668,7 +668,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True): return {} - def _poll_once(self, timeout_ms, max_records, update_offsets=True): + def _poll_once(self, timeout_ms, positions_timeout_ms, max_records, update_offsets=True): """Do one round of polling. In addition to checking for new data, this does any needed heart-beating, auto-commits, and offset updates. @@ -683,7 +683,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for if not self._subscription.has_all_fetch_positions(): - self._update_fetch_positions(self._subscription.missing_fetch_positions()) + self._update_fetch_positions(self._subscription.missing_fetch_positions(), positions_timeout_ms) # If data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately @@ -714,7 +714,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) return records - def position(self, partition): + def position(self, partition, timeout_ms=float("inf")): """Get the offset of the next record that will be fetched Arguments: @@ -728,7 +728,7 @@ def position(self, partition): assert self._subscription.is_assigned(partition), 'Partition is not assigned' offset = self._subscription.assignment[partition].position if offset is None: - self._update_fetch_positions([partition]) + self._update_fetch_positions([partition], timeout_ms) offset = self._subscription.assignment[partition].position return offset @@ -1087,7 +1087,7 @@ def _use_consumer_group(self): return False return True - def _update_fetch_positions(self, partitions): + def _update_fetch_positions(self, partitions, timeout_ms): """Set the fetch position to the committed position (if there is one) or reset it using the offset reset policy the user has configured. @@ -1099,12 +1099,13 @@ def _update_fetch_positions(self, partitions): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined. """ + end_time = time.time() + timeout_ms / 1000 # Lookup any positions for partitions which are awaiting reset (which may be the # case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do # this check first to avoid an unnecessary lookup of committed offsets (which # typically occurs when the user is manually assigning partitions and managing # their own offsets). - self._fetcher.reset_offsets_if_needed(partitions) + self._fetcher.reset_offsets_if_needed(partitions, timeout_ms) if not self._subscription.has_all_fetch_positions(): # if we still don't have offsets for all partitions, then we should either seek @@ -1115,7 +1116,8 @@ def _update_fetch_positions(self, partitions): self._coordinator.refresh_committed_offsets_if_needed() # Then, do any offset lookups in case some positions are not known - self._fetcher.update_fetch_positions(partitions) + update_timeout_ms = max(0.0, 1000 * (end_time - time.time())) + self._fetcher.update_fetch_positions(partitions, update_timeout_ms) def _message_generator_v2(self): timeout_ms = 1000 * (self._consumer_timeout - time.time()) @@ -1145,7 +1147,8 @@ def _message_generator(self): # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): partitions = self._subscription.missing_fetch_positions() - self._update_fetch_positions(partitions) + update_timeout_ms = max(0.0, 1000 * (self._consumer_timeout - time.time())) + self._update_fetch_positions(partitions, update_timeout_ms) poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms']) self._client.poll(timeout_ms=poll_ms) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 697f8be1f..9b584f860 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -4,6 +4,7 @@ import pytest from collections import OrderedDict +from unittest.mock import ANY import itertools import time @@ -114,11 +115,11 @@ def test_update_fetch_positions(fetcher, topic, mocker): # partition needs reset, no committed offset fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher.update_fetch_positions([partition]) - fetcher._reset_offset.assert_called_with(partition) + fetcher.update_fetch_positions([partition], timeout_ms=1234) + fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY) assert fetcher._subscriptions.assignment[partition].awaiting_reset is True fetcher.update_fetch_positions([partition]) - fetcher._reset_offset.assert_called_with(partition) + fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY) # partition needs reset, has committed offset fetcher._reset_offset.reset_mock() @@ -139,7 +140,7 @@ def test__reset_offset(fetcher, mocker): mocked = mocker.patch.object(fetcher, '_retrieve_offsets') mocked.return_value = {tp: (1001, None)} - fetcher._reset_offset(tp) + fetcher._reset_offset(tp, timeout_ms=1234) assert not fetcher._subscriptions.assignment[tp].awaiting_reset assert fetcher._subscriptions.assignment[tp].position == 1001 From 8397f8e5711532307c159dfed1552c2625b94c97 Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Sun, 6 Oct 2024 00:41:04 +0200 Subject: [PATCH 2/2] Propagate timeout to ensure_coordinator_ready The default timeout of ensure_coordinator_ready is infinite, this makes the Consumer block indefinitely even if poll was called with a timeout. --- kafka/consumer/group.py | 3 ++- kafka/coordinator/base.py | 8 ++++++-- kafka/coordinator/consumer.py | 9 +++++---- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 72546788a..36d03f149 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1113,7 +1113,8 @@ def _update_fetch_positions(self, partitions, timeout_ms): if (self.config['api_version'] >= (0, 8, 1) and self.config['group_id'] is not None): # first refresh commits for all assigned partitions - self._coordinator.refresh_committed_offsets_if_needed() + refresh_timeout_ms = max(0.0, 1000 * (end_time - time.time())) + self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=refresh_timeout_ms) # Then, do any offset lookups in case some positions are not known update_timeout_ms = max(0.0, 1000 * (end_time - time.time())) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e71984108..ee3766bb5 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -238,13 +238,17 @@ def coordinator(self): else: return self.coordinator_id - def ensure_coordinator_ready(self): + def ensure_coordinator_ready(self, timeout_ms = float("inf")): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ + end_time = time.time() + timeout_ms / 1000 with self._client._lock, self._lock: while self.coordinator_unknown(): - + if time.time() >= end_time: + raise Errors.KafkaTimeoutError( + "Failed to ensure coordinator is ready in %s ms" % (timeout_ms,) + ) # Prior to 0.8.2 there was no group coordinator # so we will just pick a node at random and treat # it as the "coordinator" diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 971f5e802..772e175dc 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -383,17 +383,17 @@ def need_rejoin(self): return super(ConsumerCoordinator, self).need_rejoin() - def refresh_committed_offsets_if_needed(self): + def refresh_committed_offsets_if_needed(self, timeout_ms=float("inf")): """Fetch committed offsets for assigned partitions.""" if self._subscription.needs_fetch_committed_offsets: - offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions()) + offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions(), timeout_ms=timeout_ms) for partition, offset in six.iteritems(offsets): # verify assignment is still active if self._subscription.is_assigned(partition): self._subscription.assignment[partition].committed = offset self._subscription.needs_fetch_committed_offsets = False - def fetch_committed_offsets(self, partitions): + def fetch_committed_offsets(self, partitions, timeout_ms=float("inf")): """Fetch the current committed offsets for specified partitions Arguments: @@ -405,8 +405,9 @@ def fetch_committed_offsets(self, partitions): if not partitions: return {} + end_time = time.time() + timeout_ms / 1000 while True: - self.ensure_coordinator_ready() + self.ensure_coordinator_ready(timeout_ms=max(0.0, 1000 * (end_time - time.time()))) # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions)